AQE中的CoalesceShufflePartitions和OptimizeLocalShuffleReader

背景

本文基于spark 3.1.2
在之前的文章spark CTAS nuion all (union all的个数很多)导致超过spark.driver.maxResultSize配置(2G)里我们说到,在AQE开启,且在分区合并初始化分区设置为1000的情况下,为什么了在开启了分区合并和优化本地化shuffle读取的前提下,还是会导致分区很大
默认情况下,分区合并和本地化shuffle都是开启的。且我们这里是AQE下SortMergeJoin转BroadcastHashJoin

划重点说结论

CoalesceShufflePartitions 做的分区合并,reduce端的任务会减少
OptimizeLocalShuffleReader 做的本地读取化,是合并map task任务产生的分区.
听不懂!,下文中我会用图解来给你做解释

分析以及解释

我们以最小的demo来做解释,直接上sql,运行以下sql:

SELECT  a1.order_no
                    ,a1.need_column
                    ,a1.join_id
            FROM    temp.actul_a a1 
            join temp.actul_a a2 on a1.join_id = a2.join_id and a2.need_column = 'we need it' 
            WHERE a1.need_column ='others needs it'

只分区合并,不做本地化读取

设置一下参数:

set spark.sql.adaptive.localShuffleReader.enabled=false;

运行的的物理计划,如下:


image

原本shuffle完之后为1000分区的,在经过了分区合并以后直接变成了34个分区,
且分区customerShuffleReader 的标识为coalesced
这说明在分区合并的情况下分区数是大量减少了

既分区合并又优化本地化读取

set spark.sql.adaptive.coalescePartitions.enabled=true;
set spark.sql.adaptive.localShuffleReader.enabled=true;
这两个参数默认都是true,在此设置一下只是为了强调开启这两个参数

运行的物理计划,如下:


image

可以看到在经过分区合并和以及本地读取优化以后,直接变成了65个分区,比没有经过本地读取优化多出了一半的分区

不经过分区合并,只本地优化读取

set spark.sql.adaptive.coalescePartitions.enabled=false;
注意,此条件设置完之后,shuffle后的分区数回到了400(我们默认的spark.sql.shuffle.partitions为400)

运行的物理计划如下:


image

可以看到在只有本地读取优化之后,分区直接变成了395个,和shuffle后的分区数400 没有差多少。
但是为什么合并分区和本地化优化的分区数不一样?他们到底是怎么做的呢?

为什么合并分区和本地化优化的分区数不一样

解释一下:

  • 从逻辑上:
    CoalesceShufflePartitions是把shuffle后的结果集中的各个分区(也是reduce task)定义了一种合并规则,这样在拉取数据的时候,就会按照之前合并的规则来拉取数据
    OptimizeLocalShuffleReader也是把shuffle后的结果集中的各个分区定义了一种合并规则,只不过这个规则定义是在一个map task级别,也就是说是定了map任务产生的reduce分区之间的合并

上图:


image
  • 从代码上:
    CoalesceShufflePartitions的代码在spark 3.0.1 AQE(Adaptive Query Exection)分析有说过,这里不复赘.
    我们着重分析一下OptimizeLocalShuffleReader :
    private def createLocalReader(plan: SparkPlan): CustomShuffleReaderExec = {
      plan match {
        case c @ CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
          CustomShuffleReaderExec(s, getPartitionSpecs(s, Some(c.partitionSpecs.length)))
        case s: ShuffleQueryStageExec =>
          CustomShuffleReaderExec(s, getPartitionSpecs(s, None))
      }
    }
    
    // TODO: this method assumes all shuffle blocks are the same data size. We should calculate the
    //       partition start indices based on block size to avoid data skew.
    private def getPartitionSpecs(
        shuffleStage: ShuffleQueryStageExec,
        advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec] = {
      val numMappers = shuffleStage.shuffle.numMappers
      val numReducers = shuffleStage.shuffle.numPartitions
      val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
      val splitPoints = if (numMappers == 0) {
        Seq.empty
      } else {
        equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
      }
      (0 until numMappers).flatMap { mapIndex =>
        (splitPoints :+ numReducers).sliding(2).map {
          case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start, end)
        }
      }
    }
    

主要是在getPartitionSpecs这个方法里,PartialMapperPartitionSpec是重点在以下的分析中会用到,这个方法主要就是进行分区规则设定的核心,看不懂?
核心代码拷贝过来,放在IDE中运行:

    val numMappers = 33
    val numReducers = 400
    val expectedParallelism = numReducers

    val splitPoints = if (numMappers == 0) {
      Seq.empty
    } else {
      equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
    }
    val partitionSpecs = (0 until numMappers).flatMap { mapIndex =>
      (splitPoints :+ numReducers).sliding(2).map {
        case Seq(start, end) => (mapIndex, start, end)
      }
    }

    println(s"splitPoints: $splitPoints")
    println(s"splitPoints.length: ${splitPoints.length}")
    println(s"partitionSpecs: $partitionSpecs")
    println(s"partitionSpecs.length: ${partitionSpecs.length}")
    

---结果---
splitPoints: Vector(0, 34, 68, 102, 136, 169, 202, 235, 268, 301, 334, 367)
splitPoints.length: 12   
partitionSpecs: Vector((0,0,34), (0,34,68), (0,68,102), (0,102,136), (0,136,169), (0,169,202), (0,202,235), (0,235,268), (0,268,301), (0,301,334), (0,334,367), (0,367,400), (1,0,34), (1,34,68), (1,68,102), (1,102,136), (1,136,169), (1,169,202), (1,202,235), (1,235,268), (1,268,301), (1,301,334), (1,334,367), (1,367,400), (2,0,34)。。。
partitionSpecs.length: 396
 

其中splitPoints就是设定一个map任务产生的reduce任务的分区规则,如:0,34,68代表0到33个分区(左闭右开)作为一个分区来读取,34到67作为一个分区来读取。
partitionSpecs就是具体设定一个map任务产生的哪几个分区的读取规则,如:(0,0,34) 索引为0的maptask产生的0到33个分区 由一个任务来读取。

那这种规则读取,spark是怎么实现的?我们直接定位到CustomShuffleReaderExec.scala:

private lazy val shuffleRDD: RDD[_] = {
    shuffleStage match {
      case Some(stage) =>
        sendDriverMetrics()
        stage.shuffle.getShuffleRDD(partitionSpecs.toArray)
      case _ =>
        throw new IllegalStateException("operating on canonicalized plan")
    }
  }

  override protected def doExecute(): RDD[InternalRow] = {
    shuffleRDD.asInstanceOf[RDD[InternalRow]]
  }

我们可以看到在driver生成RDD的时候,会间接的调用到stage.shuffle.getShuffleRDD(partitionSpecs.toArray) 这个方法,
该方法在构造RDD的时候会把分区读取定义规则给作为参数传递进去,而这个方法在ShuffleExchangeExec的实现为:

override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[InternalRow] = {
    new ShuffledRowRDD(shuffleDependency, readMetrics, partitionSpecs)
  }

而在ShuffledRowRDD中的两个方法:

override def getPartitions: Array[Partition] = {
    Array.tabulate[Partition](partitionSpecs.length) { i =>
      ShuffledRowRDDPartition(i, partitionSpecs(i))
    }
  }

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
    // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
    // as well as the `tempMetrics` for basic shuffle metrics.
    val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
    val reader = split.asInstanceOf[ShuffledRowRDDPartition].spec match {
      case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
        SparkEnv.get.shuffleManager.getReader(
          dependency.shuffleHandle,
          startReducerIndex,
          endReducerIndex,
          context,
          sqlMetricsReporter)

      case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) =>
        SparkEnv.get.shuffleManager.getReader(
          dependency.shuffleHandle,
          startMapIndex,
          endMapIndex,
          reducerIndex,
          reducerIndex + 1,
          context,
          sqlMetricsReporter)

      case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
        SparkEnv.get.shuffleManager.getReader(
          dependency.shuffleHandle,
          mapIndex,
          mapIndex + 1,
          startReducerIndex,
          endReducerIndex,
          context,
          sqlMetricsReporter)
    }
    reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
  }

其中getPartitions 这个方法会在计算的时候被触发,ShuffledRowRDDPartition(i, partitionSpecs(i)) 这个就是我们要读取的每一个分区,
compute就是shuffle计算的逻辑,会根据不同的规则,触发不同的case,
对应到CoalesceShufflePartitions就是CoalescedPartitionSpec.
对应到OptimizeLocalShuffleReader就是PartialMapperPartitionSpec,但是在spark 3.1.2版本的时候,还是存在一点不足,
优化完之后并不是读取多个map task产生的分区,这个在SPARK-36105中进行了补充。

为什么叫做OptimizeLocalShuffleReader

经过以上分析,物理计划在进行了OptimizeLocalShuffleReader规则之后,会产生分区读取规则,而这种分区读取规则是定义在map task产生的分区上的,
而DAGScheduler在进行任务调度的时候,会根据任务的亲和性(尽可能保证reduce任务能够跑到上游的map任务的所在的同一个executor上)来进行调度,
这样在shuffle 数据的读取阶段,针对于读取一个map task多个分区的情况来说是有很好的网络传输优化的。

具体的代码是在ShuffledRowRDD.scala:

override def getPreferredLocations(partition: Partition): Seq[String] = {
    val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
    partition.asInstanceOf[ShuffledRowRDDPartition].spec match {
      case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
        // TODO order by partition size.
        startReducerIndex.until(endReducerIndex).flatMap { reducerIndex =>
          tracker.getPreferredLocationsForShuffle(dependency, reducerIndex)
        }

      case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) =>
        tracker.getMapLocation(dependency, startMapIndex, endMapIndex)

      case PartialMapperPartitionSpec(mapIndex, _, _) =>
        tracker.getMapLocation(dependency, mapIndex, mapIndex + 1)
    }
  }

当然也可以参考这边篇what-new-apache-spark-3-local-shuffle-reader加深对LocalShuffleReader的理解

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,056评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,842评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,938评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,296评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,292评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,413评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,824评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,493评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,686评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,502评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,553评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,281评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,820评论 3 305
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,873评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,109评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,699评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,257评论 2 341

推荐阅读更多精彩内容