Spark shuffle模块主要包含三个组件:
- 负责shuffle数据读写的shuffleManager
- 负责跟踪shuffle数据地址的MapOutputTracker
- 负责远程拉去数据的Shuffle RpcService
ShuffleManager
ShuffleManager在SparkEnv初始化,参见SparkEnv.scala#301, 有一种实现SortShuffleManager(spark1.6之前是存在HashShuffleManager的)
HashShuffleManager(spark1.6之前)
HashShuffleManager使用HashShuffleWriter和FileShuffleBlockResolver,map task会将每个reduce partition的结果都输出成一个文件
问题:如果shuffle个数过多,每个ShuffleMapTask会产生reduce parition个数文件,造成系统文件句柄耗尽。
SortShuffleManager
SortShuffleManager使用Writer有三种实现:
- SortShuffleWriter
类MapReduce实现,内存buffer一定数据,超过阈值之后先在内存排序,输出文件。 如果数据量非常大,可能会输出多个文件,需要做一次或者多次归并排序
- UnsafeShuffleWriter
如果支持序列化,就会使用这个writer。这个writer的优势是可以直接对序列化的数据进行排序等操作,省去了大量反序列化的开销 先写到memory,超过阈值就会spill。和SortShuffleWriter区别就是,使用的是shuffleExternalSorter,通过taskMemoryManager使用 Tungsten方式进行内存管理。同时对数据进行排序。最后对spill的文件做一个merge。
merge的时候,是1~n partition id去遍历spills文件,从每个spill文件读取属于该partition的数据。所以partition id之间有序,但是partition内部不保证有序。
- BypassMergeSortShuffleHandle
如果map side没有使用Aggregator,并且partition数目小于spark.shuffle.sort.bypassMergeThreshold(default:200),就会使用这个writer。 类似HashShuffleWriter,不会将record先cache到memory,而是直接写文件,对每个reduce partition单独写一个文件。最后合并到一个文件里面,同时写index文件用于reduce的时候拉取。
MapOutputTracker
跟踪每个stage里面每个parition的output的location信息
- Driver端 MapOutputTrackerMaster
每个task执行完,都会将mapStatus返回driver,driver端DAGScheduler收到task完成事件,通知MapOutputTrackerMaster会记录数据属于哪个shuffleId和reduce partition.作业的所有mapStatus都存在master的shuffleStatuses字段 -
Executor上MapOutputTrackerWorker
在reduce时,worker根据ShuffledRDD的shuffleId从driver fetch这个id对应的mapStatus
RPcService
BlockManager 根据是否依赖外部shuffle服务(通过参数spark.shuffle.service.enabled,默认false),决定初始化内部或者外部shuffle client, 参见BlockManager.scala#128
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else {
blockTransferService
}
内置ShuffleService
NettyBlockTransferService 既是客户端也是服务端。 服务端对应的RpcHandler:NettyBlockRpcServer
主要支持两个操作:
- fetchBlocks 从remote node拉取blockId对应的文件
- uploadBlock 将一个local block上传到remote node(用于block复制)
外部ShuffleService
- 客户端:ExternalShuffleClient
- 服务端:
- Standalone Mode: ExternalShuffleService
- Spark on Yarn: YarnShuffleService
TODO(交互图)
通信协议
- RegisterExecutor
- OpenBlocks
注意:此时Executor上还是会启动NettyBlockTransferService, 只不过只用于Block复制,不用于shuffle服务
Sorted Shuffle 写入数据流程
- Executor执行ShuffleMapTask#runTask
- 根据Sparkenv获取shufflemanager为SortShuffleManager
- 通过shuffleDepedency拿到BasedShuffleHandle再通过SortShuffleManager获取对应的SortShuffleWriter
- SortShuffleWriter在写入之前先调用ExternalSorter的insertAll对数据进行聚合,根据shuffleDepedency中的partitioner,去汇聚同一个reduce partition的数据
- SortShuffleWriter#write会在executor本地写数据(memory或者spill到磁盘)
最后生成两个文件,data文件通过偏移量来区分不同reduce partition的数据,index文件保存不同partition的offset - 生成mapstatus,作为task result序列化后返回给driver
Sorted Shuffle 拉取数据流程
- 基于ShuffleRDD,通过BlockStoreShuffleReader,read的底层使用ShuffleBlockFetcherIterator
- ShuffleBlockFetcherIterator首先根据blockManagerId划分remote和local的请求,对于remote的数据,会根据shuffleId去driver拉取mapstatus,根据mapstatus,去对应的remote executor读取数据
- ShuffleBlockFetcherIterator对每个remote req,会使用ShuffleClient拉去Shuffle数据,放在内存Result队列里面
- BlockStoreShuffleReader#read不断迭代ShuffleBlockFetcherIterator,将拉取完成的blocks,反序列化成key-value的iterator
- 根据shuffleDependency的aggregator,将结果按照key汇聚到一个内存中PartitionedAppendOnlyMap,如果内存不足,就spill
-
最后将内存数据和磁盘数据归并成一个完整的key/value iterator, 给ShuffledRDD
ShuffleBlockFetcherIterator 通过一个spark.reducer.maxSizeInFlight控制同时拉去的shuffle数据量总大小,防止内存OOM