Spark是基于内存的计算引擎,就是说它高效的使用了分布式节点上的内存资源,尽可能多的使用内存,而不是将数据写入磁盘。内存管理机制就是其中的核心
首先Spark支持堆外内存的使用,原因在于:
- JVM对象模型的内存开销比较大
- GC的开销影响性能,Spark框架限定了数据的流程,即数据如在jobs,stages和tasks的使用范围。因此,Spark比JVM垃圾收集器了解更多关于内存块生命周期的信息,能够比JVM更有效地管理内存
底层基于
sun.misc.Unsafe
实现
内存管理MemoryManager
抽象类由两个具体实现
-
StaticMemoryManager
静态内存管理 -
UnifiedMemoryManager
,统一内存管理,从1.6版本开始默认使用
内存管理内部包含Storage和Execution两部分内存的分配与释放相关的接口,由两个具体子类实现,子类的区别在于Storage和Execution内存之间的边界是动态的还是静态的,在堆外内存的支持上也有差异
SparkEnv创建时初始化内存管理器:
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
内存模式MemoryMode
选择
// MemoryManager抽象类内部
final val tungstenMemoryMode: MemoryMode = {
if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
require(Platform.unaligned(),
"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}
}
堆内内存的申请和释放都是由JVM完成的,因此Spark的onHeap内存管理只是进行逻辑上的记录和规划,其中Java对象占用的内存大小是通过周期性采样近似估算而来,可能存在误差,被Spark标记为释放的对象实例可能还没有被JVM收回,导致实际可用内存小于Spark自身记录值,所以Spark不能准确记录内存,也就无法避免内存溢出(Out of Mamory),但是通过对storage和execution内存分别进行规划管理,一定程度上可以提升内存利用率,减少异常
堆外内存依赖Unsafe API可以直接在操作系统中申请,存储经过序列化的二进制数据,能够被精准的申请和释放,序列化数据占用的空间大小也可以被精确计算,更进一步优化内存管理。默认情况下不开启堆外内存,可以通过spark.memory.offHeap.enabled
设定开启堆外内存,通过spark.memory.offHeap.size
设定堆外内存的大小
内存管理接口:
//申请内存
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
def acquireExecutionMemory(numBytes: Long,taskAttemptId: Long,memoryMode: MemoryMode): Long
//释放内存
def releaseExecutionMemory(numBytes: Long,taskAttemptId: Long,memoryMode: MemoryMode): Unit
def releaseAllExecutionMemoryForTask(taskAttemptId: Long)
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit
def releaseAllStorageMemory(): Unit
def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode)
// 查询内存的使用
def executionMemoryUsed: Long
def storageMemoryUsed: Long
def getExecutionMemoryUsageForTask(taskAttemptId: Long): Long
其中用于将RDD分区内数据由不连续的内存空间占用cache为连续存储空间的过程被称为 展开
Unroll
,对应于acquireUnrollMemory
内存池 MemoryPool
MemoryManager通过内存池机制管理内存,它表示内存空间中一段大小可以调节的区域,它是一个抽象类,需要MemoryManager作为一个锁对象实现同步,控制内存池大小变化过程中避免出现竞争
具体由ExecutionMemoryPool
和StorageMemoryPool
分别用来计算执行和数据缓存,由于内存模式,对应四种资源池
onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
内存页
MemoryManager中设定了默认的内存页大小,也可以通过spark.buffer.pageSize
指定
大小位于1M到64M,必须是2的幂次,具体取决于执行内存的大小,core的数目
StaticMemoryManager
用于数据存储与计算执行的内存占比和界限是固定的,彼此不能相互占用内存
静态内存不支持堆外内存作为数据存储的StaticMemoryManager does not support off-heap storage memory,实际只有三个内存池
- StorageMemory: systemMaxMemory * memoryFraction (
spark.storage.memoryFraction
, 0.6) * safetyFraction(spark.storage.safetyFraction
, 0.9),其中有部分用于unroll(spark.storage.unrollFraction
, 0.2) - ExecutionMemory: systemMaxMemory * memoryFraction(
spark.shuffle.memoryFraction
, 0.2) * safetyFraction(spark.shuffle.safetyFraction
, 0.8) - Other:系统保留,存储运行中产生的一些对象
UnifiedMemoryManager
静态方式往往无法高效试用所有的应用类型,需要合理调整内存比例,否则容易造成内存浪费
- reservedMemory:保留固定300MB空间
系统内存需要大于保留内存的1.5倍,即450MB,否则会报错 - usableMemory:实际可用内存
- 数据存储和计算执行公用内存区域(
spark.memory.fraction
, 0.6),两者初始比例通过spark.memory.storageFraction
设定,默认为0.5 - other,默认占据剩余的空间
- 数据存储和计算执行公用内存区域(
当双方的空间都不足时,则存储到磁盘,如果一方充足,可以借用,执行内存如果被占用,可以要求存储内存归还借用,如果存储内存被执行内存占用,无法归还,因为其中牵涉的shuffle较为复杂,总的来说就是执行内存实际上优先级比存储高,借用无需归还
存储内存管理
入口是StorageMemoryPool
,核心是用来申请内存的acquireMemory
方法,如果内存池中的内存不足,调用MemoryStore
的evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
方法,将内存数据块落盘
当需要将内存借给执行内存时,通过freeSpaceToShrinkPool
收缩存储区域,如果存储区域剩余空间不足,内部还是会调用evictBlocksToFreeSpace
来收缩
当需要cache RDD数据到内存时,BlockManager
通过迭代器接收数据,它将iterator展开(unroll
)成一个数组,unroll需要的内存从存储空间中获取,对应的内存块根据是否序列化数据分为两种,DeserializedMemoryEntry
用一个数组存储未序列化的对象实例,SerializedMemoryEntry
用ChunkedByteBuffer
存储二级制数据,因为iterator的数据可能非常大,无法完全装入内存中,因此实际过程中会逐渐unroll迭代器,即周期性的检测评估是否还有充足的内存空间,内存不足就继续申请(默认是翻1.5倍),如果最终完全展开,就将其存储到entries,否则就根据cache级别,可能会选择存储到DiskStore
所有的内存Block块记录在MemoryManager
的MemoryStore
的entries字段
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
因为链表中记录了访问顺序,具体当内存不足时就可以根据LRU策略evict旧的数据块,根据存储级别决定是直接淘汰还是写入磁盘
淘汰block的选择策略:迭代链表,block的存储模式要相同,blockId不能与要添加的block属于同一个RDD,避免循环淘汰,不能淘汰正在被读取的block,需要获取到候选淘汰的blockId的排它锁
def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
执行内存管理
对应ExecutionMemoryPool
,核心memoryForTask = new mutable.HashMap[Long, Long]()
记录每个Task的内存用量
假设当前Executor同时执行的任务数目为n,每个任务可以占用的内存为[1/2n,1/n],每个任务要保证至少能够申请到1/2n的执行内存,如果通过acquireMemory
申请内存时,能够授予的内存小于请求的数目,并且该task总内存小于最低要求(toGrant < numBytes && curMem + toGrant < minMemoryPerTask
),会在时阻塞到锁对象MemoryManager
上,直到其他任务释放内存或者活跃任务数目发生变化时被唤醒,内部采用notifyAll
和wait
同步机制
主要用于shuffle等计算过程对内存的需求
Tungsten 内存优化
内存分配器的基础是MemoryAllocator
接口,内部有allocate和free两个方法来申请内存和释放内存,对应UnsafeMemoryAllocator
和HeapMemoryAllocator
两种具体实现,在TaskMemoryManager
内部,当申请内存通过MemoryManager
的批准后,用来分配内存页面
MemoryLocation
内存位置,内部包含两个字段
@Nullable
Object obj;
long offset;
- off-heap内存模式下,直接记录一个内存地址,通过64bit的offset存储绝对地址
- on-heap内存模式,由于GC过程中对堆内存进行重组,因此地址的定位需要通过对象在堆中的引用,以及该对象内的偏移来实现,其中obj为堆中该对象的引用,offset对应偏移量
MemoryBlock
表示连续的内存块,继承了MemoryLocation
,on-heap或off-heap都使用连续的内存空间来存储数据(on-heap模式下,减少对象数目,可以降低GC的开销)
成员字段:
-
private final long length
:表示内存块的长度 -
public int pageNumber
:表示内存页的编号
任务内存管理 TaskMemoryManager
Spark的数据以分区为单位进行处理,每个分区对应一个Task,因此内存的管理以Task为单位,每个Task在启动时创建一个TaskMemoryManager
用来分配内存页面,相关信息存储在页表pageTable
上,采用类似操作系统中虚拟内存逻辑地址的概念,并将逻辑地址映射到实际的物理地址
每个页面就是一个MemoryBlock,逻辑地址为64bit,其中页码为高13位,页内偏移占51位,最大页码数量8192,页面最大数量受限制于on-heap模式下的long[] array
,为(2^31 - 1) * 8
字节,大约17GB,一共支持140TB的内存空间
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
//使用位图标识对应的页是否被分配
private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);
缓存感知计算
缓存感知计算通过更有效地使用L1/L2/L3CPU缓存来提高数据处理的速度,因为它们比主存快几个数量级,在分析Spark用户应用程序时,发现大部分CPU时间都花在等待从主内存中提取数据上,在Tungsten中,设计了缓存友好的算法和数据结构,来使得Spark应用程序将花费更少的时间等待从内存中获取数据,并将更多时间用于执行有用的工作
这里的优化主要是用于排序操作,sort通常是有很好的缓存命中率,因为它是连续扫描访问的模式,不过针对引用对象的排序缓存命中率极低,因为每次比较操作需要对两个指针解引用,他们指向的位置通常位于主存中
所以采用的办法就是将每个记录的key值和对应的引用指针放在一起,避免了随机内存查找,直接顺序扫描就可以
以ShuffleExternalSorter
为例:
存储序列化后的record到申请的page页面上,同时,每条记录会产生一个封装记录指针PackedRecordPointer
,形式如下
[24 bit partition number][13 bit memory page number][27 bit offset in page]
在没有字节对齐的情况下,最大的Page大小为2^27 bits = 128 MB,对应总大小 2^13 * 128 MB = 1 TB,所以每个task最大定位1TB大小的内存
该记录指针在ShuffleInMemorySorter
内部排序,内存不足或者结束时,根据排序后的结果,读取页面内对应的记录,然后写出