一. 编程模型
二. 组件简介
三. 作业执行
四. 内存管理
五. 存储原理
六. shuffle
七. 性能调优
八. 知识脑图
一. 编程模型
1. RDD的基本属性
分区
在一些大规模计算中,一个数据集中的数据量会达到非常大的级别,而这些数据难以在一台机器上进行存储和计算,rdd的思路就是将这些数据进行分区,一个大的数据集被分为很小的分区
rdd的计算是以分区为单位进行的,而且同一分区的所有数据进行相同的计算逻辑.对于同一个分区的数据而言,必须执行相同的操作:要么都执行,要么都不执行.分区的数量决定了同时执行的任务的数量,因为可以为每个分区启动一个计算任务用于单独计算这个分区的数据
计算函数
rdd的数据被分区了,但是每个分区的数据是如何得来的?一个是rdd的数据来源只有两种:一个是从数据源或集合中进行加载得到rdd的数据;另一个是通过其他rdd进行一定的转换得到的数据,无论是哪一种方式,rdd的数据其实都是通过rdd的计算函数得到的
依赖
每个父rdd的分区最多被子rdd的一个分区使用,这种依赖称为窄依赖;多个子rdd的分区数据依赖父rdd的同一个分区的数据
什么时候发生shuffle???
shuffle算子是否一定触发shuffle?
分区器
分区器的作用是:如何把map阶段的结果进行分组,区分出结果是给reduce阶段的rdd哪个分区
首选运行位置
每个RDD对于每个分区来说有一组首选运行位置,用于标识RDD的这个分区数据最好能够在哪台主机运行.如Hadooprdd能够实现加载数据的任务在相应的数据节点上执行
2. RDD的缓存
rdd是进行迭代式计算,默认并不会保存中间结果的数据,在计算完成后,中间的结果数据都将会丢失,如果一个rdd在计算完成后,不是通过流水线的方式被一个rdd调用,而是被多个rdd分别调用,则在计算过程中就需要对rdd进行保存,避免rdd的二次计算,当一个rdd被缓存后,后面调用的时候需要rdd的数据直接从缓存中读取,而不是对rdd再次进行计算.尤其是一个rdd经过了特别复杂的计算过程,对其缓存可以极大的提高程序的执行效率
因为rdd是分布式的,不同的分区散落在不同的节点上,所以rdd的缓存也是分布式的
3. Spark RDD操作
Spark定义了很多对rdd的操作,主要分为两类:transformation和action,transformation并不会真正的触发job的执行,它只是定义了rdd之间的转换关系,即rdd之间的lineage,只有action才会触发job的真正执行
transformation
操作 | 说明 |
---|---|
map | 迭代RDD中的每个元素生成新的RDD |
filter | |
flatmap | |
mappartitions | |
distinct | |
groupbykey | |
reducebykey | |
union | |
coalesce | |
repartition |
action
操作 | 说明 |
---|---|
collect | |
count | |
first | |
take | |
saveAsTextFile | |
foreach | |
reduce |
持久化
操作 | 说明 |
---|---|
MEMORY_ONLY | |
MEMORY_AND_DISK | |
MEMORY_ONLY_SER | |
MEMORY_AND_DISK_SER | |
DISK_ONLY | |
MEMORY_ONLY_2 |
4. 源码分析
环境准备
idea环境,安装scala插件,创建一个maven项目...
RDD源码
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getPartitions: Array[Partition]
protected def getDependencies: Seq[Dependency[_]] = deps
val id: Int = sc.newRddId()
RDD分区源码
trait Partition extends Serializable {
def index: Int
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
RDD的partition是一个特质,其实现很简单,就是记录了分区的索引并重写了hashcode,RDD通过一个partition数组,即可表示出这个RDD由多少个分区组成,每个分区的索引用于表示出每个不同的分区,子类通过实现partition的特质,从而具有更加丰富的分区功能
private[spark] class NewHadoopPartition(
rddId: Int,
val index: Int,
rawSplit: InputSplit with Writable)
extends Partition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
override def hashCode(): Int = 31 * (31 + rddId) + index
override def equals(other: Any): Boolean = super.equals(other)
}
RDD计算函数源码
def compute(split: Partition, context: TaskContext): Iterator[T]
RDD依赖源码
RDD分区器源码
RDD首选运行位置源码
补充说明
- (不发生shuffle的情况下)在多个rdd转换的过程,因为他们之间的分区是一一对应的,也就是每个rdd只依赖父rdd的一个固定的分区的数据即可.每个分区中的数据可以通过一个流水线任务(task)转换完成,各个任务之间相互独立,互不影响
- 在计算rdd的过程中,如果出现shuffle,则其过程有如下特点:第一点,必须首先计算出依赖rdd的所有分区的数据,然后后续rdd才能继续进行计算.第二点,shuffle的过程必然分为两个阶段,map和reduce阶段.第三点,后一个阶段必须在上一个阶段的数据全部完成计算以后才能开始计算,所以必须拆为两组不同的任务按照先后顺序执行
二. 组件介绍
1. 术语介绍
- Appplication
- Job
- Task
- Stage
- Driver
- Executor
- Worker Node
2. Spark RPC
3. Driver
Driver进程其实就是运行SparkContext进程.在SparkContext初始化的过程中,创建了一些组件.这些组件负责实现Job执行,stage划分,task提交等Driver的功能
SparkEnv
RpcEnv
SerializerManager
ShuffleManager
BroadcastManager
BlockManager
MemoryManager
MapOutputTracker
listenerBus
SparkUI
DAGScheduler
TaskScheduler
SchedulerBackend
4. Executor
5. 运行模式
6. 存储简介
在Spark中有很多需要存储数据的地方,如对RDD进行缓存,shuffle时map阶段的数据存储,广播变量时各节点对变量的存储等.这些数据的存储都离不开Spark的存储模块.Spark的存储模块将需要存储的数据进行了抽象,无论是说明类型的数据,无论数据是什么内容,只要需要存储的数据都称为block,每个block都有唯一的id进行标识,并且存储模块提供多种不同级别的存储,比如是内存存储,磁盘存储,堆外内容存储等.
7. 源码分析👍
SparkContext主要组件
SparkEnv创建
消息总线创建
TaskScheduler和SchedulerBackend
DAGScheduler
Master
Worker
应用提交流程
三. 作业执行原理
1. 整体执行流程
2. Job提交
- 为什么需要action操作
- Job提交
- 分布式执行
- Spark实现
3. Stage划分
- 宽依赖和窄依赖
- 如何判断RDD之间的依赖关系
- Stage划分
- Spark实现
4. Task划分
5. Task提交
6. Task执行
7. Task结果处理
8. 源码分析
四. 作业执行原理
1. 内存使用概述
- 堆内存和堆外内存
- 存储内存和执行内存
- 逻辑划分
2. 内存池的划分
- 内存池概念
- 内存池划分
- 内存模式
3. 内存管理
- 静态内存管理器
- 统一内存管理器
4. 源码分析
五. 存储原理
六. shuffle
1. Spark执行shuffle的流程
总体流程
shuffleRDD的生成
在shuffledependency中包含了在shuffle过程中使用的几个重要的组件,这些组件如下
- Partitioner,用于将key进行分组,判断哪个key应该分到哪一组中.partitioner确定了所有的key一种能够分为多少组.这些分组的数量也决定了下游reduce任务分区的大小.在map端数据进行分组时,便将每个key使用partitioner进行分组,进而得到每个key所属的分组
- Aggregator,用户将同一个key的两个value进行聚合,也可以将两个聚合后的值进行聚合.在reduce端将使用Aggregator将同一个key的所有value进行聚合,如果定义了在map进行聚合,在执行map过程的时候,也会调用Aggregator首先在map端进行聚合
- ShuffleHandle,用于在map端获取写入器(shuffleWriter)将分区的数据写入文件中,在reduce端用于获取分区读取器(shuffleReader),读取该分区中对应的不同的map端输出的数据
- 此外,在shuffledependency中还记录了依赖的父RDD,该shuffle的id,是否对key进行排序,是否在map端进行聚合
Stage的划分
在执行的rdd触发action操作后,DAGScheduler会递归RDD的依赖关系,每遇到一个shuffledependency就会将依赖的RDD划分到新的stage中,最终一个job被划分到有先后依赖关系的多个stage中.最后的stage称为ResultStage,之前所有的Stage都为ShuffleMapStage.一个ShuffleMapStage加载数据的过程可能直接从数据源中加载,也可以是某个shuffle过程的reduce阶段,从上个stage的map端输出进行加载.但所有的ShuffleMapStage运行完成后,都会将数据分组到当前节点的BlockMananger的文件中,等待下一个stage来拉取结果
Task的划分
Stage划分完成后,每个Stage会根据计算RDD的分区的大小划分为多个Task,每个Task计算RDD的一个分区的数据,ShuffleMapStage中划分的task为shufflemaptask,shufflemaptask会被序列化到executor节点中进行执行,shufflemaptask的执行会将该分区的数据进行分组,如果需要map端聚合在分组过程中则还会进行聚合操作.最终将分组的数据写入到所在节点的文件.shufflemaptask在序列化时,发送到executor中的内容主要有该stage中执行map操作的rdd,下游rdd依赖的shuffledependency,计算的分区等
Map端的写入
Reduce端的读取
2. Shuffle内存管理
任务内存管理
内存消费者
内存消费组件
在执行shuffle的过程中,有几个重要的地方需要申请执行内存,在map端将key进行聚合的过程(如果需要),将key按照分区排序的过程(如果需要)和在reduce端将数据聚合的过程,对key排序的过程(如果需要).在这些过程中,都需要申请执行内存完成需要的操作.对于不同的操作如排序,聚合等,spark使用不同的组件来完成其功能.其中,externalsorter和shuffleexternalsorter用于map端对迭代器的key按照分区排序,externalsorter还用于reduce端对key的排序,Exeternalappendonlymap用于对于迭代器中key的聚合
Tungsten内存管理
Tungsten内存消费组件
3. ShuffleWrite
- HashshuffleManager
- HashshuffleWriter:在map端每个shufflemaptask执行时,都会获取一个shuffleWriter,HashshuffleWriter在写入map端数据的时候,会对迭代器中的数据使用partitioner进行分组,为每个分组生成一个文件,将分组中的数据写入到文件中,如果map端需要聚合时,hashshufflewriter会使用externalappendonlymap首先对数据进行聚合,将聚合后的数据分组写入到不同的文件中.假如在map中的task数量为10000,在reduce的端的task数量为1000,那么在集群中map端的过程会形成1000 * 1000各文件,由此可见,使用hashshufflewriter将会产生大量的文件,会对系统的IO造成巨大压力,而且在对文件读写需要打开文件的输出流,打开大量的文件将会消耗大量的内存,使executor端的内存也产生很大的压力.为了解决大量文件的问题,spark引入consolidation机制,同一个executor中的同一个CPU核执行的task,可以将相同的分组写入到同一文件中,这在一定程度上减少了文件的生成
- SortshuffleManager:SortshuffleManager可以获取三种不同的shufflewriter,这三种shufflewriter在map端最终都将数据写入了一个文件中,避免了大量文件的生成,减缓了shuffle过程中io压力,在获取三种不同的shufflewriter中,其写入数据过程时不同的,但最终写入的文件格式和效果是一致的,都是按照key按照分区进行排序,依次将不同分区的数据序列化后写入到同一个文件,再使用一个index小文件记录每个分区的数据在文件中的索引即可.在Spark中使用filesegment对象表示文件的一部分,filesegment中保留有文件索引的引用和该文件的偏移量的开始与文件的长度
- Bypassmergesortshufflewriter
使用Bypassmergesortshufflewriter的前提是map端数据不需要聚合,并且生成的分区数小于200,该值可以通过spark.shuffle.sort.bypassMergeThreshold
配置,因为Bypassmergesortshufflewriter与hashshufflewriter非常类似,每个task会为下游的每个分区生成一个文件,在这种情况下如果分区数太多会造成大量的文件被打开,产生io瓶颈,因此使用该shufflewriter时,分区数不应该太多,不必执行按照分区排序的过程,在小分区的情况下就能获得不错的性能,Bypassmergesortshufflewriter为每个分区生成一个临时文件,最终将所有的文件合并,按照分区顺序写入一个文件中,同时生成对应index索引文件 - sortshufflewriter
- unsafeshufflewriter
4. ShuffleRead
七. 性能调优
1. 任务监控
- SparkUI
- Spark运行日志详解
Driver端的日志
- 在提交Driver程序,sparkcontext初始化,通过driverwrapper将程序提交到集群中运行
- spark context在初始化时,会创建sparkenv;driver端的sparkenv中创建了mapoutputtracker用于保存shufflemap端的结果;创建了blockmanagermaster用于管理所有的blockmanager,维护block的元数据;创建了blockmanager,用于保存该节点block块,在blockmanager中创建了diskblockmanager用于磁盘数据存储,创建了memorystore用于内存存储,后续过程启动sparkUI
- 在blockmanager创建过程中还会创建blocktransferservice,用于该节点与其他节点通信.在shuffle过程中,reduce端根据blockid到map端拉取数据就是通过该组件实现的
- 在创建schedulerbackend后,会通过Standaloneappclient将application提交到master中,master分配资源启动executor,executor启动成功后会再次向driver中进行注册,通知driver启动成功
- 当executor初始化完成后,其节点中的blockmanager会到driver中的blockmanager中进行注册,driver中的blockmanager也会进行注册
- 在使用textfile方法的时候,spark会将Hadoop的配置文件进行广播,其他executor到hdfs中拉取数据时,使用此配置.广播变量最终会存储在memorystore中,也会显示存储数据的大小和剩余空间的大小
- 在用户编写的代码中action操作会触发job操作的执行,每个job会被划分为stage,如果一个stage中所有的父stage都计算完成或没有父stage,则会提交这个stage
- 每个提交的stage都会被划分为多个task,task的数量与rdd分区一致,所有的task计算逻辑都是相同的,将task序列化后进行广播,这样可实现多个task在同一个executor执行时,仅仅保存一份task的二级制数据,多个task被封装为taskset交给taskscheduler
- schedulerbackend将等待的task提交到有空闲CPU的executor中,并输出task的id,stage id,运行的executor,计算的分区,本地化级别等信息
- 当计算的task运行完成时,会将结果返回到driver端,输出task的计算时间,executor,计算的分区等信息.当有task计算完成会有executor的CPU,这时会将等待运行的task提交到空闲的executor中,如此循环往复,直到stage的所有task都完成计算
- 当该stage中所有task都计算完成时,会将taskset从队列中移除,完成该stage的计算,此时会查找依赖此stage的子stage,将子stage进行提交,计算子stage
- 当某个job的所有stage完成后,该job计算完成,输出job的计算时间,运行下一个job
Executor端的日志
- driver端负责stage的划分,task的提交.executor端负责任务的执行并将任务结果进行返回.executor在初始化时同样会创建sparkenv,在sparkenv中创建blockmanager,memorymanager等组件
- 当executor端接收到task时,会运行task,并输出该task属于哪个stage,在首次运行task时,会从广播变量中获取task的二级制数据,该节点的blockmanager会从远程的节点拉取
- 在task运行过程中,如果对RDD进行缓存,会将该RDD的计算的分区的数据缓存到节点的blockmanager,如果指定使用内存,则当内存不足时,会提示缓存失败
- 当blockmanager中存储内存不足时,则将内存中的数据溢写到磁盘中
- 当task运行完成时,会将task的运行结果返回到driver,并输出计算结果的大小
- 在执行shuffle操作时,map端使用exeternalsorter对数据进行分组,按照分区进行排序,如果内存不足,则会将内存中的数据写入到磁盘中,为后续数据迭代留出空间.在reduce端使用exeternalsorter对key进行排序时,如果内存不足则同样会溢出到磁盘中
- 在reduce端对所有的map端的task中的数据进行聚合时,会使用exeternalappendonlymap组件,如果内存不足,该组件会将数据溢出到磁盘中
2. 程序优化
- 并行度
- 避免创建重复的RDD
- RDD持久化
- 广播变量
- 高性能序列化库
- 优化资源操作连接
3. 资源优化
- CPU,spark应用程序分配的CPU的个数决定了集群中能够同时并行运行的task的个数
- 内存
- 磁盘
- executor数量的权衡:如果仅仅对于CPU的使用而言,其实是相同的,因为每个task的执行会占用一个CPU,task不会关心是使用哪个executor的CPU运行的.对于内存使用则不同,因为如果在一个executor中CPU的数量过多,在该executor中执行的task数量会变多,如果task需要进行shuffle操作,则所有task会共享同一个executor中的执行内存.假如此时一个executor分配的内存过少,则会造成每个task分配的执行内存过少.同样,如果对RDD进行缓存时,在一个executor中,此时也需要增加内存满足多个分区的数据缓存使用.如果在任务中使用到了大的广播变量,此时分配的executor越多,那么共享变量的副本数就越多.此外,executor进程本身也需要消耗内存
- spark管理内存比例
- 使用alluxio加速数据访问
4. shuffle过程优化
- map端聚合
- 文件读写缓冲区
- reduce端并行拉取数量
- 溢写文件上限
- 数据倾斜