Spark的堆内内存
Spark之所以比MR快百倍,就是因为是基于内存迭代式计算的,于是就有了DAG有向无环图.所以搞清楚了Spark是怎么管理内存的,对我们后期Spark调优、性能有更深的理解
Spark的内存分为堆内内存 和 堆外内存
下图中的On-heap Memory为堆内内存,Off-heap Memory为堆外内存.
为什么要堆内内存?
Spark要通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存新的RDD,以及是否为新的任务分配执行内存,在一定程度上可以提升内存的利用率,减少异常的出现。堆内内存的大小,由Spark应用程序启动时的–executor-memory
同时,堆内内存从内存分配上可以分为 静态方式 和 统一方式
堆内内存的静态方式:
堆内内存的统一方式:
申请堆内内存流程:
1. Spark在代码中new一个对象实例
2. JVM从堆内内存分配空间,创建对象并返回对象引用
3. Spark保存该对象的引用,记录该对象占用的内存
释放堆内内存流程
1. Spark记录该对象释放的内存,删除该对象的引用
2. 等待JVM的垃圾回收机制释放该对象占用的堆内内存
Spark的堆外内存
为什么要堆外内存?
Spaek为了进一步优化内存的使用以及提高Shuffle时排序的效率,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,利用JDK Unsafe API, Spark可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的GC扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
RDD的持久化机制
弹性分布式数据集(RDD)作为Spark最根本的数据抽象,是只读的分区记录(Partition)的集合,只能基于在稳定物理存储中的数据集上创建,或者在其他已有的RDD上执行转换(Transformation)操作产生一个新的RDD。转换后的RDD与原始的RDD之间产生的依赖关系,构成了血统(Lineage)。凭借血统,Spark保证了每一个RDD都可以被重新恢复。但RDD的所有转换都是惰性的,即只有当一个返回结果给Driver的行动(Action)发生时,Spark才会创建任务读取RDD,然后真正触发转换的执行。
Task在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查Checkpoint或按照血统重新计算。所以如果一个RDD上要执行多次行动,可以在第一次行动中使用persist或cache方法,在内存或磁盘中持久化或缓存这个RDD,从而在后面的行动时提升计算速度。事实上,cache方法是使用默认的MEMORY_ONLY的存储级别将RDD持久化到内存,故缓存是一种特殊的持久化。堆内和堆外存储内存的设计,便可以对缓存RDD时使用的内存做统一的规划和管理
内存管理之多任务间的分配
Executor内运行的任务同样共享执行内存,Spark用一个HashMap结构保存了任务到内存耗费的映射。每个任务可占用的执行内存大小的范围为1/2N ~ 1/N,其中N为当前Executor内正在运行的任务的个数。每个任务在启动之时,要向MemoryManager请求申请最少为1/2N的执行内存,如果不能被满足要求则该任务被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。
Shuffle的内存占用
执行内存主要用来存储任务在执行Shuffle时占用的内存,Shuffle是按照一定规则对RDD数据重新分区的过程,我们来看Shuffle的Write和Read两阶段对执行内存的使用:
Shuffle Write
若在map端选择普通的排序方式,会采用ExternalSorter进行外排,在内存中存储数据时主要占用堆内执行空间。若在map端选择Tungsten的排序方式,则采用ShuffleExternalSorter直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
Shuffle Read
在对reduce端的数据进行聚合时,要将数据交给Aggregator处理,在内存中存储数据时占用堆内执行空间。如果需要进行最终结果排序,则要将再次将数据交给ExternalSorter处理,占用堆内执行空间。