Spark Shuffle 服务

Spark shuffle模块主要包含三个组件:

  • 负责shuffle数据读写的shuffleManager
  • 负责跟踪shuffle数据地址的MapOutputTracker
  • 负责远程拉去数据的Shuffle RpcService

ShuffleManager

ShuffleManager在SparkEnv初始化,参见SparkEnv.scala#301, 有一种实现SortShuffleManager(spark1.6之前是存在HashShuffleManager的)

HashShuffleManager(spark1.6之前)

HashShuffleManager使用HashShuffleWriter和FileShuffleBlockResolver,map task会将每个reduce partition的结果都输出成一个文件

问题:如果shuffle个数过多,每个ShuffleMapTask会产生reduce parition个数文件,造成系统文件句柄耗尽。

SortShuffleManager

SortShuffleManager使用Writer有三种实现:

  • SortShuffleWriter

类MapReduce实现,内存buffer一定数据,超过阈值之后先在内存排序,输出文件。 如果数据量非常大,可能会输出多个文件,需要做一次或者多次归并排序

  • UnsafeShuffleWriter

如果支持序列化,就会使用这个writer。这个writer的优势是可以直接对序列化的数据进行排序等操作,省去了大量反序列化的开销 先写到memory,超过阈值就会spill。和SortShuffleWriter区别就是,使用的是shuffleExternalSorter,通过taskMemoryManager使用 Tungsten方式进行内存管理。同时对数据进行排序。最后对spill的文件做一个merge。

merge的时候,是1~n partition id去遍历spills文件,从每个spill文件读取属于该partition的数据。所以partition id之间有序,但是partition内部不保证有序。

  • BypassMergeSortShuffleHandle

如果map side没有使用Aggregator,并且partition数目小于spark.shuffle.sort.bypassMergeThreshold(default:200),就会使用这个writer。 类似HashShuffleWriter,不会将record先cache到memory,而是直接写文件,对每个reduce partition单独写一个文件。最后合并到一个文件里面,同时写index文件用于reduce的时候拉取。


image.png

MapOutputTracker

跟踪每个stage里面每个parition的output的location信息

  • Driver端 MapOutputTrackerMaster
    每个task执行完,都会将mapStatus返回driver,driver端DAGScheduler收到task完成事件,通知MapOutputTrackerMaster会记录数据属于哪个shuffleId和reduce partition.作业的所有mapStatus都存在master的shuffleStatuses字段
  • Executor上MapOutputTrackerWorker
    在reduce时,worker根据ShuffledRDD的shuffleId从driver fetch这个id对应的mapStatus


    image.png

RPcService

BlockManager 根据是否依赖外部shuffle服务(通过参数spark.shuffle.service.enabled,默认false),决定初始化内部或者外部shuffle client, 参见BlockManager.scala#128

// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else {
blockTransferService
}

内置ShuffleService

NettyBlockTransferService 既是客户端也是服务端。 服务端对应的RpcHandler:NettyBlockRpcServer

主要支持两个操作:

  • fetchBlocks 从remote node拉取blockId对应的文件
  • uploadBlock 将一个local block上传到remote node(用于block复制)

外部ShuffleService

  • 客户端:ExternalShuffleClient
  • 服务端:
    • Standalone Mode: ExternalShuffleService
    • Spark on Yarn: YarnShuffleService
      TODO(交互图)

通信协议

  • RegisterExecutor
  • OpenBlocks
    注意:此时Executor上还是会启动NettyBlockTransferService, 只不过只用于Block复制,不用于shuffle服务

Sorted Shuffle 写入数据流程

  • Executor执行ShuffleMapTask#runTask
  • 根据Sparkenv获取shufflemanager为SortShuffleManager
  • 通过shuffleDepedency拿到BasedShuffleHandle再通过SortShuffleManager获取对应的SortShuffleWriter
  • SortShuffleWriter在写入之前先调用ExternalSorter的insertAll对数据进行聚合,根据shuffleDepedency中的partitioner,去汇聚同一个reduce partition的数据
  • SortShuffleWriter#write会在executor本地写数据(memory或者spill到磁盘)
    最后生成两个文件,data文件通过偏移量来区分不同reduce partition的数据,index文件保存不同partition的offset
  • 生成mapstatus,作为task result序列化后返回给driver

Sorted Shuffle 拉取数据流程

image.png
  • 基于ShuffleRDD,通过BlockStoreShuffleReader,read的底层使用ShuffleBlockFetcherIterator
  • ShuffleBlockFetcherIterator首先根据blockManagerId划分remote和local的请求,对于remote的数据,会根据shuffleId去driver拉取mapstatus,根据mapstatus,去对应的remote executor读取数据
  • ShuffleBlockFetcherIterator对每个remote req,会使用ShuffleClient拉去Shuffle数据,放在内存Result队列里面
  • BlockStoreShuffleReader#read不断迭代ShuffleBlockFetcherIterator,将拉取完成的blocks,反序列化成key-value的iterator
  • 根据shuffleDependency的aggregator,将结果按照key汇聚到一个内存中PartitionedAppendOnlyMap,如果内存不足,就spill
  • 最后将内存数据和磁盘数据归并成一个完整的key/value iterator, 给ShuffledRDD


    image.png

    ShuffleBlockFetcherIterator 通过一个spark.reducer.maxSizeInFlight控制同时拉去的shuffle数据量总大小,防止内存OOM

©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342