概述
- 就像我们所知道的,Flink采用了自己管理内存的方式,或者说积极的内存管理,而MemorySegment就是Flink实现自己内存管理的核心。
- 那么MemorySegment是什么,有什么作用呢,具体实现呢?
- 1、Flink 并不是将大量对象分配在堆上,而是将对象都序列化到一个预分配的内存块上,这个内存块就做
MemorySegment
。
- 2、
MemorySegment
代表了一段固定长度的内存(默认大小为 32KB),也是 Flink中最小的内存分配单元,并且提供了非常高效的读写方法。
- 3、可以把 MemorySegment 想象成是为 Flink 定制的
java.nio.ByteBuffer
。它的底层可以是一个普通的Java字节数组(byte[])
,也可以是一个申请在堆外的 ByteBuffer
。
- 4、每条Record都会以序列化的形式存储在一个或多个MemorySegment中。
- 5、
MemorySegment
提供了一些从特定位置获取int,long,byte arrays,compare and copy memory的相关方法. put and get at specific positions ints, longs, byte arrays, etc, and compare and copy memory.
Understanding the JIT and tuning the implementation
- 在之前MemorySegment是一个独立、final的类。通过类层次结构分析(Class Hierarchy Analysis,CHA), JIT编译器能够确定所有访问方法调用都指向一个特定的实现。这样,所有方法调用都可以完全去虚化(de-virtualized)和内联,这是对性能进行所有进一步优化(比如调用循环的向量化)的基础。
- 如果JIT不能执行相同的优化,这将导致显著的性能差异,在下面的示例中,速度降低了2.7 x左右:
Writing 100000 x 32768 bytes to 32768 bytes segment:
HeapMemorySegment (standalone) : 1,441 msecs
OffHeapMemorySegment (standalone) : 1,628 msecs
HeapMemorySegment (subclass) : 3,841 msecs
OffHeapMemorySegment (subclass) : 3,847 msecs
回到最初的性能
方法一、确保只加载一个memory segment实现。
- 使用工厂确保在任何地方都实例化相同的MemorySegment子类(Heap or off-heap)
方法二、Write one segment that handles both heap and off-heap memory
- 而这个MemorySegment就是
HybridMemorySegment
,它透明地处理堆和堆外内存。并且可以使用一个byte array(堆内存)或指向堆外内存区域(堆外内存)的指针进行初始化。
- 幸运的是,在不引入代码分支和针对两种不同的内存类型分别处理的情况下,有一个很好的技巧可以做到这一点。这个技巧是基于
sun.misc.Unsafe
方法对对象引用的解释方式。为了说明这一点,下面用从内存位置获取长整数的方法为例:
sun.misc.Unsafe.getLong(Object reference, long offset)
- 该方法接受对象引用,获取其内存地址,并添加偏移量以获得指针。然后,它从指向的地址获取8个字节,并将它们解释为一个长整数。由于该方法接受null作为引用(并将其解释为零),因此可以编写一个方法,从堆和堆外内存无缝地提取一个长整数,如下所示:
public class HybridMemorySegment {
private final byte[] heapMemory; // non-null in heap case, null in off-heap case
private final long address; // may be absolute, or relative to byte[]
// method of interest
public long getLong(int pos) {
return UNSAFE.getLong(heapMemory, address + pos);
}
// initialize for heap memory
public HybridMemorySegment(byte[] heapMemory) {
this.heapMemory = heapMemory;
this.address = UNSAFE.arrayBaseOffset(byte[].class)
}
// initialize for off-heap memory
public HybridMemorySegment(long offheapPointer) {
this.heapMemory = null;
this.address = offheapPointer
}
}