自顶向下深入分析Netty(九)--UnpooledByteBuf源码分析

前文分析了ByteBuf的抽象类实现框架,现在开始分析最底层的实现类。分为两种情形:Unpooled和Pooled,首先看Unpooled。

1.UnpooledHeapByteBuf

该Bytebuf的底层为不使用对象池技术的JAVA堆字节数组,首先看其中的成员变量:

    private final ByteBufAllocator alloc;   // 分配器
    byte[] array;   // 底层字节数组
    private ByteBuffer tmpNioBuf; // NIO的ByteBuffer形式

只需要着重关注array变量,它是位于JAVA堆的字节数组。
再看一个构造方法(忽略其中的参数检查):

    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, 
                                    int initialCapacity, int maxCapacity) {
        super(maxCapacity);

        this.alloc = alloc;
        setArray(allocateArray(initialCapacity));
        setIndex(0, 0);
    }
    
    private void setArray(byte[] initialArray) {
        array = initialArray;
        tmpNioBuf = null;
    }
    
    byte[] allocateArray(int initialCapacity) {
        return new byte[initialCapacity];
    }

实现也很简单,只需关注allocateArray()方法,分配一个数组;对应地,有一个freeArray()方法,释放一个数组,代码如下:、

    void freeArray(byte[] array) {
        // NOOP 
    }

由于堆内的字节数组会被GC自动回收,所以不需要具体实现代码。此外,在引用计数的分析中,当引用计数释放的时候需要调用deallocate()方法释放该ByteBuf,实现如下:

    protected void deallocate() {
        freeArray(array);
        array = null;
    }

同理,使用GC自动回收,而设置array=null可以帮助GC回收。
ByteBuf中有关于判断底层实现的方法,具体实现也很简单:

    // 默认的字节序:大端模式
    public ByteOrder order() { return ByteOrder.BIG_ENDIAN; }
    
    // 底层是否有JAVA堆字节数组
    public boolean hasArray() { return true; }
    
    // 底层数组的偏移量
    public int arrayOffset() { return 0; }

    // 是否直接数组
    public boolean isDirect() { return false; }
    
    // 是否含有os底层的数组起始地址
    public boolean hasMemoryAddress() { return false; }

接下来,看重要的设置容量方法capacity(int newCapacity)

    public ByteBuf capacity(int newCapacity) {
        int oldCapacity = array.length;
        byte[] oldArray = array;
        if (newCapacity > oldCapacity) {    // 容量扩增
            byte[] newArray = allocateArray(newCapacity); // 申请数组
            // 将老数组的字节复制到新数组
            System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
            setArray(newArray);
            freeArray(oldArray);
        } else if (newCapacity < oldCapacity) { // 容量缩减
            byte[] newArray = allocateArray(newCapacity);
            int readerIndex = readerIndex();
            // 容量缩减导致读写索引改变
            if (readerIndex < newCapacity) {
                int writerIndex = writerIndex();
                if (writerIndex > newCapacity) {
                    writerIndex(writerIndex = newCapacity);
                }
                // 只拷贝读索引之后的数据,读索引之前0填充
                System.arraycopy(oldArray, readerIndex, 
                                 newArray, readerIndex, writerIndex - readerIndex);
            } else {
                setIndex(newCapacity, newCapacity);
            }
            setArray(newArray);
            freeArray(oldArray);
        }
        // 容量相等时不做处理
        return this;
    }

设置容量分为两种情况:容量扩增和容量缩减。实现都是将老数据复制到新的字节数组中,有必要的话,调整读写索引位置。
之前分析过getXXX()readXXX()的核心实现是_getXXX(index)方法,以_getInt(index)为例进行分析,代码如下:

    protected int _getInt(int index) {
        return HeapByteBufUtil.getInt(array, index);
    }
    
    static int getInt(byte[] memory, int index) {
        return  (memory[index]     & 0xff) << 24 |
                (memory[index + 1] & 0xff) << 16 |
                (memory[index + 2] & 0xff) <<  8 |
                memory[index + 3] & 0xff;
    }

将字节数组中指定索引位置处的4个字节按照大端模式通过移位组装为一个整数。同理,可推断_setInt(index)方法将一个整数的4个字节通过移位填充到字节数组的指定位置,确实如此,核心实现如下:

    static void setInt(byte[] memory, int index, int value) {
        memory[index]     = (byte) (value >>> 24);
        memory[index + 1] = (byte) (value >>> 16);
        memory[index + 2] = (byte) (value >>> 8);
        memory[index + 3] = (byte) value;
    }

可以派生新的ByteBuf的方法中,slice()duplicate()共享底层实现,在本类中,就是共享array变量,但各自维护独立索引,而copy()方法有自己独立的底层字节数组,通过将数据复制到一个新的字节数组实现,代码如下:

    public ByteBuf copy(int index, int length) {
        checkIndex(index, length);
        byte[] copiedArray = new byte[length];
        System.arraycopy(array, index, copiedArray, 0, length);
        return new UnpooledHeapByteBuf(alloc(), copiedArray, maxCapacity());
    }

虽然JDK自带的ByteBuffer有各种缺憾,但在进行IO时,不得不使用原生的ByteBuffer,所以Netty的ByteBuf也提供方法转化,实现如下:

    public ByteBuffer internalNioBuffer(int index, int length) {
        checkIndex(index, length);
        return (ByteBuffer) internalNioBuffer().clear()
                                .position(index).limit(index + length);
    }
    
    private ByteBuffer internalNioBuffer() {
        ByteBuffer tmpNioBuf = this.tmpNioBuf;
        if (tmpNioBuf == null) {
            this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array);
        }
        return tmpNioBuf;
    }

方法将该类转化为JDK的HeapByteBuffer,可见也是一个堆缓冲区。clear().position(index).limit(index + length)的使用是防止原生ByteBuffer的读写模式切换造成的错误。

至此,UnpooledHeapByteBuf的实现分析完毕,可见并没有想象中的困难,再接再厉,分析UnpooledDirectByteBuf

2. UnpooledDirectByteBuf

Netty的UnpooledDirectByteBuf在NIO的DirectByteBuf上采用组合的方式进行了封装,屏蔽了对程序员不友好的地方,并使其符合Netty的ByteBuf体系。使用与UnpooledHeapByteBuf相同的顺序进行分析,首先看成员变量:

    private final ByteBufAllocator alloc;   // 分配器

    private ByteBuffer buffer;  // 底层NIO直接ByteBuffer
    private ByteBuffer tmpNioBuf; // 用于IO操作的ByteBuffer
    private int capacity; // ByteBuf的容量
    private boolean doNotFree; // 释放标记

做一个简介,buffer表示底层的直接ByteBuffer;tmpNioBuf常用来进行IO操作,实现实质是buffer.duplicate()即与buffer共享底层数据结构;capacity表示缓冲区容量,即字节数;doNotFree是一个标记,表示是否需要释放buffer的底层内存。

接着分析构造方法:

    protected UnpooledDirectByteBuf(ByteBufAllocator alloc, 
                                int initialCapacity, int maxCapacity) {
        super(maxCapacity);

        this.alloc = alloc;
        setByteBuffer(allocateDirect(initialCapacity));
    }
    
    protected ByteBuffer allocateDirect(int initialCapacity) {
        return ByteBuffer.allocateDirect(initialCapacity);
    }
    
    private void setByteBuffer(ByteBuffer buffer) {
        ByteBuffer oldBuffer = this.buffer;
        if (oldBuffer != null) {
            if (doNotFree) {
                doNotFree = false;
            } else {
                freeDirect(oldBuffer);
            }
        }

        this.buffer = buffer;
        tmpNioBuf = null;
        capacity = buffer.remaining();
    }

由于setByteBuffer(buffer)中含有doNotFree变量使得理解稍微困难.仔细分析,当doNotFree为true时,调用后置为false,而为false时都需要freeDirect(oldBuffer)。由此可知,doNotFree表示不需要释放旧的Buffer,根据代码大全,使用反义Not并不是好的做法,使用free表示是否需要释放旧的Buffer会更容易让人理解。另外从代码可以看出:不需要释放旧的Buffer只有一种情况,这种情况便是Buffer作为构造方法的参数时,代码如下:

    protected UnpooledDirectByteBuf(ByteBufAllocator alloc, 
                                      ByteBuffer initialBuffer, int maxCapacity) {
        super(maxCapacity);
        int initialCapacity = initialBuffer.remaining();

        this.alloc = alloc;
        doNotFree = true;   // 置为true 表示不需要释放原有buffer
        setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN));
        // 此时 doNotFree已经为false
        writerIndex(initialCapacity);
    }

分析完,发现doNotFree是一个不必要的变量,除非在执行构造方法的时候,oldBuffer不为null。(目前没想到有什么情况如此)
使用allocateDirect(initialCapacity)分配内存时实际委托给NIO的方法,释放内存freeDirect(buffer)也如此,委托给了NIO中DirectByteBuffer的cleaner,代码如下:

    protected void freeDirect(ByteBuffer buffer) {
        PlatformDependent.freeDirectBuffer(buffer);
    }
    
    public void freeDirectBuffer(ByteBuffer buffer) {
        if (!buffer.isDirect()) {
            return;
        }
        try {
            Object cleaner = PlatformDependent0.getObject(buffer, CLEANER_FIELD_OFFSET);
            if (cleaner != null) {
                CLEAN_METHOD.invoke(cleaner);
            }
        } catch (Throwable cause) {
            PlatformDependent0.throwException(cause);
        }
    }

实际代码根据JDK版本不同调用不同方法,上述只是其中之一,但原理相同,不再列出。
与引用计数相关的deallocate()方法,代码实现如下:

    protected void deallocate() {
        ByteBuffer buffer = this.buffer;
        if (buffer == null) {
            return;
        }

        this.buffer = null;

        if (!doNotFree) { 
            freeDirect(buffer); // 前述分析可知,doNotFree构造方法之后一直为false
        }
    }

判断底层实现的方法则如下:

    // 默认的字节序:大端模式
    public ByteOrder order() { return ByteOrder.BIG_ENDIAN; }
    
    // 是否直接数组
    public boolean isDirect() { return true; }
    
    // 底层是否有JAVA堆字节数组
    public boolean hasArray() { throw new UnsupportedOperationException("..."); }
    
    // 底层数组的偏移量
    public int arrayOffset() { throw new UnsupportedOperationException("..."); }

    // 是否含有os底层的数组起始地址
    public boolean hasMemoryAddress() { return false; }

设置容量的方法:

    public ByteBuf capacity(int newCapacity) {
        checkNewCapacity(newCapacity);

        int readerIndex = readerIndex();
        int writerIndex = writerIndex();

        int oldCapacity = capacity;
        if (newCapacity > oldCapacity) {    // 容量扩增
            ByteBuffer oldBuffer = buffer;
            ByteBuffer newBuffer = allocateDirect(newCapacity);
            oldBuffer.position(0).limit(oldBuffer.capacity());
            newBuffer.position(0).limit(oldBuffer.capacity());
            newBuffer.put(oldBuffer);
            newBuffer.clear();
            setByteBuffer(newBuffer);
        } else if (newCapacity < oldCapacity) {  // 容量缩减
            ByteBuffer oldBuffer = buffer;
            ByteBuffer newBuffer = allocateDirect(newCapacity);
            if (readerIndex < newCapacity) {
                if (writerIndex > newCapacity) {
                    writerIndex(writerIndex = newCapacity);
                }
                oldBuffer.position(readerIndex).limit(writerIndex);
                newBuffer.position(readerIndex).limit(writerIndex);
                newBuffer.put(oldBuffer);
                newBuffer.clear();
            } else {
                setIndex(newCapacity, newCapacity);
            }
            setByteBuffer(newBuffer);
        }
        return this;
    }

与HeapByteBuf类似,容量改变时,都将oldBuffer中的数据复制到新的newBuffer中,只是在容量缩减时,需要调整读写索引。
接着看关键的_getInt(index)_setInt(index,value)方法:

    protected int _getInt(int index) {
        return buffer.getInt(index);
    }
    
    protected void _setInt(int index, int value) {
        buffer.putInt(index, value);
    }

可见具体实现委托给了NIO原生的ByteBuffer,追踪其中的具体实现,一种情况下的实现如下:

    
    static int getIntB(long a) {
        return makeInt(_get(a    ),
                       _get(a + 1),
                       _get(a + 2),
                       _get(a + 3));
    }
    
    static private int makeInt(byte b3, byte b2, byte b1, byte b0) {
        return (((b3       ) << 24) |
                ((b2 & 0xff) << 16) |
                ((b1 & 0xff) <<  8) |
                ((b0 & 0xff)      ));
    }

可见与Netty的HeapByteBuf实现一致。另一种情况是native实现,没有找到具体实现代码,如果你有兴趣可以寻找相关实现,有相关发现请告诉我。
继续看copy()方法:

    public ByteBuf copy(int index, int length) {
        ensureAccessible();
        ByteBuffer src;
        try {
            src = (ByteBuffer) buffer.duplicate()
                        .clear().position(index).limit(index + length);
        } catch (IllegalArgumentException ignored) {
            throw new IndexOutOfBoundsException(
                    "Too many bytes to read - Need " + (index + length));
        }

        return alloc().directBuffer(length, maxCapacity()).writeBytes(src);
    }

对原buffer使用duplicate()方法,从而不干扰原来buffer的索引。然后从分配器中申请一个buffer并写入原buffer的数据。
最后看internalNioBuffer()

    public ByteBuffer internalNioBuffer(int index, int length) {
        checkIndex(index, length);
        return (ByteBuffer) internalNioBuffer()
                    .clear().position(index).limit(index + length);
    }

    private ByteBuffer internalNioBuffer() {
        ByteBuffer tmpNioBuf = this.tmpNioBuf;
        if (tmpNioBuf == null) {
            this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
        }
        return tmpNioBuf;
    }

可见,与copy()相同,使用duplicate()防止干扰原buffer的索引。
至此,UnpooledDirectByteBuf的源码分析完毕。

3. UnsafeByteBuf

Netty还使用JAVA的后门类sun.misc.Unsafe实现了两个缓冲区UnpooledUnsafeHeapByteBufUnpooledUnsafeDirectByteBuf。这个强大的后门类Unsafe可以暴露出对象的底层地址,一般不建议使用,而性能优化狂魔Netty则顾不得这些。简单介绍一下这两个类的原理,不再对代码进行分析。UnpooledUnsafeHeapByteBuf在使用Unsafe后,暴露出字节数组在JAVA堆中的地址,所以不再使用字节数组的索引即array[index]访问,转而使用baseAddress + Index的得到字节的地址,然后从该地址取得字节。UnpooledUnsafeDirectByteBuf也一样,暴露底层DirectByteBuffer的地址后,使用相同的Address + Index方式取得对应字节。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容