Spark 源码分析(2)RDD 的依赖

RDD 的成员之一是依赖集,依赖集也关系到任务调度

源码

Dependency代码主要在一个源文件中:core/Dependency代码中有5个类。
除此以外在 core/rdd/PartitionPruningRDD还有一个PruneDependency类。
他们的名称和继承关系如下图:


3017B56B-EBD0-4E3A-BAD8-777B872194D4.png

通过阅读代码可以得到以下信息:

  1. 依赖的根类是Dependency,只有一个RDD 成员,表示依赖的对象。这类继承了Serializable类,是可以序列化的。
  2. 依赖分为两大种,一个叫窄依赖(Narrow),另一个就洗牌依赖(Shuffle,很多材料也叫作“宽”依赖)。
  3. 从数量关系上说,“1-to-1”(OneToOne)、“n-to-1”(Range)、“1-to-部分分区”(Prune,剪枝)是窄依赖,宽依赖是“n-to-n”的。
  4. “1-to-1”是RDD的默认依赖。上节中的MapPartitionRDD是一对一的转换,就包含“1-to-1“依赖。
  5. ”n-to-1“的依赖只有一个使用场景——UnionRDD,“交”运算,多个 RDD 合并到一个 RDD 中。
  6. 剪枝依赖是个私有对象,用于优化,减少数据载入。
  7. 洗牌依赖复杂一些。

只有 RDD 的转换(Transformations)才用到依赖,RDD 的操作(Actions,如reduce、collect等)就不需要依赖,直接运行SparkContext.runjob()函数。RDD 有哪些转换和操作?阅读文档: Spark Programming Guide - Spark 2.0.2 Documentation

洗牌依赖

洗牌依赖也只是相对复杂,代码也不长。

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, _rdd.partitions.length, this)
  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

是的,只有8行代码,核心内容是:

  1. 这个类有三个泛型类型,K=key,V=value,C=combiner;
  2. 洗牌依赖只能用于Product2[K, V]及其父类,即 KV 数据;
  3. 成员有 分区器(partitioner) 、序列器(serializer)、排序器(keyOrdering)、聚合器(aggregator)、map 端聚合开关(mapSideCombine);
  4. _rdd.context.newShuffleId() 获得一个自增的 ID;
  5. _rdd.context.env.shuffleManager.registerShuffle 获得几个洗牌的句柄。通过core/shuffle/sort/SortShuffleManager代码可以知道,一共有三种句柄:
    1. 分区数很少(小于变量spark.shuffle.sort.bypassMergeThreshold,默认200)时,用BypassMergeSortShuffleHandle,直接发送数据合并,不用耗时的序列化和反序列化;
    2. 否则,如果能序列化,则用SerializedShuffleHandle,用序列化和反序列化,降低网络 IO;
    3. 否则,使用基础的BaseShuffleHandle

Scala语法

在 Java 中使用null 是非常容易出错的,在 Guava( GitHub - google/guava: Google Core Libraries for Java 6+ ) 中提供了 Optional 来避免使用 null。
同样的 Scala 自带了 Option 来避免使用 null。

Option 有两个子类,Some 和 None

scala> val capitals = Map("France"->"Paris", "Japan"->"Tokyo", "China"->"Beijing")
capitals: scala.collection.immutable.Map[String,String] = Map(France -> Paris, Japan -> Tokyo, China -> Beijing)

scala> capitals get "France"
res0: Option[String] = Some(Paris)

scala> capitals get "North Pole"
res1: Option[String] = None

None.get 会报错,用 getOrElse 可以给取不到值的时候赋默认值:

scala> capitals get "North Pole" get
warning: there was one feature warning; re-run with -feature for details
java.util.NoSuchElementException: None.get
  at scala.None$.get(Option.scala:347)
  at scala.None$.get(Option.scala:345)
  ... 33 elided

scala> capitals get "France" get
warning: there was one feature warning; re-run with -feature for details
res3: String = Paris

scala> (capitals get "North Pole") getOrElse "Oops"
res7: String = Oops

scala> capitals get "France" getOrElse "Oops"
res8: String = Paris

在 Spark 中大量使用了 Option。

疑问列表

我将阅读过程中的未解内容记录下来,留待以后阅读代码时解答。疑问一个一个划掉,就是成长的过程。

  1. reduce 等 RDD 操作是如何执行的?
  2. groupByKey 等洗牌操作是如何执行的?不同的洗牌类型有什么用?

结论

  1. 在 RDD 的转换时,会用到依赖
  2. 依赖包括窄依赖(1-to-1、n-to-1关系)、洗牌依赖(n-to-n 关系)
  3. 洗牌依赖包含分区器、序列器、排序器、聚合器、map聚合开关、ID、洗牌类型句柄等成分组成。洗牌类型句柄有三种。

本文源码

Dependency spark/Dependency.scala at master · apache/spark · GitHub
三种洗牌句柄 spark/SortShuffleManager.scala at master · apache/spark · GitHub
Option示例来自: 【Scala】使用Option、Some、None,避免使用null - 简书


@ Kangying Village, Beijing, China

Spark源码/Dependency

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,271评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,725评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,252评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,634评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,549评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,985评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,471评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,128评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,257评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,233评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,235评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,940评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,528评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,623评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,858评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,245评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,790评论 2 339

推荐阅读更多精彩内容