在前文分析了ByteBuf的抽象类实现框架,现在开始分析最底层的实现类。分为两种情形:Unpooled和Pooled,首先看Unpooled。
1.UnpooledHeapByteBuf
该Bytebuf的底层为不使用对象池技术的JAVA堆字节数组,首先看其中的成员变量:
private final ByteBufAllocator alloc; // 分配器
byte[] array; // 底层字节数组
private ByteBuffer tmpNioBuf; // NIO的ByteBuffer形式
只需要着重关注array
变量,它是位于JAVA堆的字节数组。
再看一个构造方法(忽略其中的参数检查):
protected UnpooledHeapByteBuf(ByteBufAllocator alloc,
int initialCapacity, int maxCapacity) {
super(maxCapacity);
this.alloc = alloc;
setArray(allocateArray(initialCapacity));
setIndex(0, 0);
}
private void setArray(byte[] initialArray) {
array = initialArray;
tmpNioBuf = null;
}
byte[] allocateArray(int initialCapacity) {
return new byte[initialCapacity];
}
实现也很简单,只需关注allocateArray()
方法,分配一个数组;对应地,有一个freeArray()
方法,释放一个数组,代码如下:、
void freeArray(byte[] array) {
// NOOP
}
由于堆内的字节数组会被GC自动回收,所以不需要具体实现代码。此外,在引用计数的分析中,当引用计数释放的时候需要调用deallocate()
方法释放该ByteBuf,实现如下:
protected void deallocate() {
freeArray(array);
array = null;
}
同理,使用GC自动回收,而设置array=null
可以帮助GC回收。
ByteBuf中有关于判断底层实现的方法,具体实现也很简单:
// 默认的字节序:大端模式
public ByteOrder order() { return ByteOrder.BIG_ENDIAN; }
// 底层是否有JAVA堆字节数组
public boolean hasArray() { return true; }
// 底层数组的偏移量
public int arrayOffset() { return 0; }
// 是否直接数组
public boolean isDirect() { return false; }
// 是否含有os底层的数组起始地址
public boolean hasMemoryAddress() { return false; }
接下来,看重要的设置容量方法capacity(int newCapacity)
:
public ByteBuf capacity(int newCapacity) {
int oldCapacity = array.length;
byte[] oldArray = array;
if (newCapacity > oldCapacity) { // 容量扩增
byte[] newArray = allocateArray(newCapacity); // 申请数组
// 将老数组的字节复制到新数组
System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
setArray(newArray);
freeArray(oldArray);
} else if (newCapacity < oldCapacity) { // 容量缩减
byte[] newArray = allocateArray(newCapacity);
int readerIndex = readerIndex();
// 容量缩减导致读写索引改变
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
// 只拷贝读索引之后的数据,读索引之前0填充
System.arraycopy(oldArray, readerIndex,
newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
freeArray(oldArray);
}
// 容量相等时不做处理
return this;
}
设置容量分为两种情况:容量扩增和容量缩减。实现都是将老数据复制到新的字节数组中,有必要的话,调整读写索引位置。
之前分析过getXXX()
和readXXX()
的核心实现是_getXXX(index)
方法,以_getInt(index)
为例进行分析,代码如下:
protected int _getInt(int index) {
return HeapByteBufUtil.getInt(array, index);
}
static int getInt(byte[] memory, int index) {
return (memory[index] & 0xff) << 24 |
(memory[index + 1] & 0xff) << 16 |
(memory[index + 2] & 0xff) << 8 |
memory[index + 3] & 0xff;
}
将字节数组中指定索引位置处的4个字节按照大端模式通过移位组装为一个整数。同理,可推断_setInt(index)
方法将一个整数的4个字节通过移位填充到字节数组的指定位置,确实如此,核心实现如下:
static void setInt(byte[] memory, int index, int value) {
memory[index] = (byte) (value >>> 24);
memory[index + 1] = (byte) (value >>> 16);
memory[index + 2] = (byte) (value >>> 8);
memory[index + 3] = (byte) value;
}
可以派生新的ByteBuf的方法中,slice()
和duplicate()
共享底层实现,在本类中,就是共享array
变量,但各自维护独立索引,而copy()
方法有自己独立的底层字节数组,通过将数据复制到一个新的字节数组实现,代码如下:
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
byte[] copiedArray = new byte[length];
System.arraycopy(array, index, copiedArray, 0, length);
return new UnpooledHeapByteBuf(alloc(), copiedArray, maxCapacity());
}
虽然JDK自带的ByteBuffer
有各种缺憾,但在进行IO时,不得不使用原生的ByteBuffer
,所以Netty的ByteBuf
也提供方法转化,实现如下:
public ByteBuffer internalNioBuffer(int index, int length) {
checkIndex(index, length);
return (ByteBuffer) internalNioBuffer().clear()
.position(index).limit(index + length);
}
private ByteBuffer internalNioBuffer() {
ByteBuffer tmpNioBuf = this.tmpNioBuf;
if (tmpNioBuf == null) {
this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array);
}
return tmpNioBuf;
}
方法将该类转化为JDK的HeapByteBuffer
,可见也是一个堆缓冲区。clear().position(index).limit(index + length)
的使用是防止原生ByteBuffer
的读写模式切换造成的错误。
至此,UnpooledHeapByteBuf
的实现分析完毕,可见并没有想象中的困难,再接再厉,分析UnpooledDirectByteBuf
。
2. UnpooledDirectByteBuf
Netty的UnpooledDirectByteBuf
在NIO的DirectByteBuf
上采用组合的方式进行了封装,屏蔽了对程序员不友好的地方,并使其符合Netty的ByteBuf
体系。使用与UnpooledHeapByteBuf
相同的顺序进行分析,首先看成员变量:
private final ByteBufAllocator alloc; // 分配器
private ByteBuffer buffer; // 底层NIO直接ByteBuffer
private ByteBuffer tmpNioBuf; // 用于IO操作的ByteBuffer
private int capacity; // ByteBuf的容量
private boolean doNotFree; // 释放标记
做一个简介,buffer
表示底层的直接ByteBuffer;tmpNioBuf
常用来进行IO操作,实现实质是buffer.duplicate()
即与buffer
共享底层数据结构;capacity
表示缓冲区容量,即字节数;doNotFree
是一个标记,表示是否需要释放buffer
的底层内存。
接着分析构造方法:
protected UnpooledDirectByteBuf(ByteBufAllocator alloc,
int initialCapacity, int maxCapacity) {
super(maxCapacity);
this.alloc = alloc;
setByteBuffer(allocateDirect(initialCapacity));
}
protected ByteBuffer allocateDirect(int initialCapacity) {
return ByteBuffer.allocateDirect(initialCapacity);
}
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}
由于setByteBuffer(buffer)
中含有doNotFree
变量使得理解稍微困难.仔细分析,当doNotFree
为true时,调用后置为false,而为false时都需要freeDirect(oldBuffer)
。由此可知,doNotFree
表示不需要释放旧的Buffer,根据代码大全,使用反义Not并不是好的做法,使用free
表示是否需要释放旧的Buffer会更容易让人理解。另外从代码可以看出:不需要释放旧的Buffer只有一种情况,这种情况便是Buffer作为构造方法的参数时,代码如下:
protected UnpooledDirectByteBuf(ByteBufAllocator alloc,
ByteBuffer initialBuffer, int maxCapacity) {
super(maxCapacity);
int initialCapacity = initialBuffer.remaining();
this.alloc = alloc;
doNotFree = true; // 置为true 表示不需要释放原有buffer
setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN));
// 此时 doNotFree已经为false
writerIndex(initialCapacity);
}
分析完,发现doNotFree
是一个不必要的变量,除非在执行构造方法的时候,oldBuffer不为null。(目前没想到有什么情况如此)
使用allocateDirect(initialCapacity)
分配内存时实际委托给NIO的方法,释放内存freeDirect(buffer)
也如此,委托给了NIO中DirectByteBuffer的cleaner,代码如下:
protected void freeDirect(ByteBuffer buffer) {
PlatformDependent.freeDirectBuffer(buffer);
}
public void freeDirectBuffer(ByteBuffer buffer) {
if (!buffer.isDirect()) {
return;
}
try {
Object cleaner = PlatformDependent0.getObject(buffer, CLEANER_FIELD_OFFSET);
if (cleaner != null) {
CLEAN_METHOD.invoke(cleaner);
}
} catch (Throwable cause) {
PlatformDependent0.throwException(cause);
}
}
实际代码根据JDK版本不同调用不同方法,上述只是其中之一,但原理相同,不再列出。
与引用计数相关的deallocate()
方法,代码实现如下:
protected void deallocate() {
ByteBuffer buffer = this.buffer;
if (buffer == null) {
return;
}
this.buffer = null;
if (!doNotFree) {
freeDirect(buffer); // 前述分析可知,doNotFree构造方法之后一直为false
}
}
判断底层实现的方法则如下:
// 默认的字节序:大端模式
public ByteOrder order() { return ByteOrder.BIG_ENDIAN; }
// 是否直接数组
public boolean isDirect() { return true; }
// 底层是否有JAVA堆字节数组
public boolean hasArray() { throw new UnsupportedOperationException("..."); }
// 底层数组的偏移量
public int arrayOffset() { throw new UnsupportedOperationException("..."); }
// 是否含有os底层的数组起始地址
public boolean hasMemoryAddress() { return false; }
设置容量的方法:
public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
int readerIndex = readerIndex();
int writerIndex = writerIndex();
int oldCapacity = capacity;
if (newCapacity > oldCapacity) { // 容量扩增
ByteBuffer oldBuffer = buffer;
ByteBuffer newBuffer = allocateDirect(newCapacity);
oldBuffer.position(0).limit(oldBuffer.capacity());
newBuffer.position(0).limit(oldBuffer.capacity());
newBuffer.put(oldBuffer);
newBuffer.clear();
setByteBuffer(newBuffer);
} else if (newCapacity < oldCapacity) { // 容量缩减
ByteBuffer oldBuffer = buffer;
ByteBuffer newBuffer = allocateDirect(newCapacity);
if (readerIndex < newCapacity) {
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
oldBuffer.position(readerIndex).limit(writerIndex);
newBuffer.position(readerIndex).limit(writerIndex);
newBuffer.put(oldBuffer);
newBuffer.clear();
} else {
setIndex(newCapacity, newCapacity);
}
setByteBuffer(newBuffer);
}
return this;
}
与HeapByteBuf类似,容量改变时,都将oldBuffer中的数据复制到新的newBuffer中,只是在容量缩减时,需要调整读写索引。
接着看关键的_getInt(index)
和_setInt(index,value)
方法:
protected int _getInt(int index) {
return buffer.getInt(index);
}
protected void _setInt(int index, int value) {
buffer.putInt(index, value);
}
可见具体实现委托给了NIO原生的ByteBuffer,追踪其中的具体实现,一种情况下的实现如下:
static int getIntB(long a) {
return makeInt(_get(a ),
_get(a + 1),
_get(a + 2),
_get(a + 3));
}
static private int makeInt(byte b3, byte b2, byte b1, byte b0) {
return (((b3 ) << 24) |
((b2 & 0xff) << 16) |
((b1 & 0xff) << 8) |
((b0 & 0xff) ));
}
可见与Netty的HeapByteBuf
实现一致。另一种情况是native实现,没有找到具体实现代码,如果你有兴趣可以寻找相关实现,有相关发现请告诉我。
继续看copy()
方法:
public ByteBuf copy(int index, int length) {
ensureAccessible();
ByteBuffer src;
try {
src = (ByteBuffer) buffer.duplicate()
.clear().position(index).limit(index + length);
} catch (IllegalArgumentException ignored) {
throw new IndexOutOfBoundsException(
"Too many bytes to read - Need " + (index + length));
}
return alloc().directBuffer(length, maxCapacity()).writeBytes(src);
}
对原buffer使用duplicate()
方法,从而不干扰原来buffer的索引。然后从分配器中申请一个buffer并写入原buffer的数据。
最后看internalNioBuffer()
:
public ByteBuffer internalNioBuffer(int index, int length) {
checkIndex(index, length);
return (ByteBuffer) internalNioBuffer()
.clear().position(index).limit(index + length);
}
private ByteBuffer internalNioBuffer() {
ByteBuffer tmpNioBuf = this.tmpNioBuf;
if (tmpNioBuf == null) {
this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
}
return tmpNioBuf;
}
可见,与copy()
相同,使用duplicate()
防止干扰原buffer的索引。
至此,UnpooledDirectByteBuf
的源码分析完毕。
3. UnsafeByteBuf
Netty还使用JAVA的后门类sun.misc.Unsafe
实现了两个缓冲区UnpooledUnsafeHeapByteBuf
和UnpooledUnsafeDirectByteBuf
。这个强大的后门类Unsafe
可以暴露出对象的底层地址,一般不建议使用,而性能优化狂魔Netty则顾不得这些。简单介绍一下这两个类的原理,不再对代码进行分析。UnpooledUnsafeHeapByteBuf
在使用Unsafe
后,暴露出字节数组在JAVA堆中的地址,所以不再使用字节数组的索引即array[index]访问,转而使用baseAddress + Index的得到字节的地址,然后从该地址取得字节。UnpooledUnsafeDirectByteBuf
也一样,暴露底层DirectByteBuffer的地址后,使用相同的Address + Index方式取得对应字节。