Spark Join优化-BucketJoin实现

偶然读取到了字节跳动关于Spark做的一些优化,发现其中一项被称为BuckedtJoin的优化项

传送门:Spark SQL 在字节跳动数据仓库领域的优化实践

而我曾经也实现过一个类似的解决方案,现在才知道这种方案有一个专业的名词BucketJoin。此篇我们来介绍以下,在不进行Spark源码修改的情况下,怎么实现BucketJoin

背景

Spark提供的Join方式主要有HashJoin、BroadcastJoin、SortMergeJoin;对应的使用条件如下:

  • BroadcastJoin:其中一个表非常小,可以被完整放入内存中;
  • HashJoin:两个表都很大,但其中一个表在进行Shuffle之后,在Reduce端的单partition数据能被加载导内存中;
  • SortMergeJoin:两个表都很大,且在Reduce端无法被加载导内存,只能通过两个有序迭代流进行命中比较;

它们的实现原理不再赘述,性能表现为:

BroadcastJoin > HashJoin > SortMergeJoin

BucketJoin 适用场景

有这样一种情况,Table 1 与 Table 2 内的数据已经按照相同的 Key 进行分桶且桶数都为 n,同时桶内按该 Key 排序。对这两张表进行 Join 时,可以避免 Shuffle,直接启动 n 个 Task 进行 Join,我们称之为BucketJoin。


BucketJoin

性能表现为

BroadcastJoin > BucketJoin > HashJoin > SortMergeJoin

1 BucketJoin 实现思路

解决思路:

BucketJoin在实现的过程中,根据数据量的不同,在拿到两个相同分区之后,可以有以下两种Join解决方案:

  1. right表可以放在内存中:映射为HashMap,进行命中查询即可;
  2. right表无法放在内存中:保证左右表有序,进行迭代流命中比较(类似SortMergeJoin在Reduce端的实现方案),如何保证有序,两种解法:
    2.1 数据生成时进行排序;
    2.2 读入数据后调用DataFrame.sortInPartition进行分区内排序;

2 基于Spark-RDD的实现方案

Spark有一个算子zipPartitions可以帮我们实现这个方案

 //preservesPartitioning:是否适用left表的Partitionner来执行计算
def zipPartitions[B: ClassTag, V: ClassTag]
  (rdd2: RDD[B], 
  preservesPartitioning: Boolean) 
  (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]

这个算子执行的前提是两个RDD的分区数一致,计算时会将相同partitionId的数据放在同一个Task进行计算,计算规则就是传入的f:针对两个迭代器进行处理的匿名函数。

但是有一个问题,正常读入数据,Spark是不能保证HDFS上的数据和Partition的Id一致的,比如一个目录下的part-00000文件,读入后的partitionId就可能不是0,导致两个RDD的partition不能一一对应。怎么保证读入的partition能够按照我设想的情况,在HDFS上和RDD的partition是一致的呢?


靓仔懵逼

我们知道,sc.textFile等读数据方式,是可以按照路径的通配符,或者多个路径进行数据读入的。如果直接读取父目录,或者通配符,就不能保证数据分片和RDDpartition分片号对应的。但是如果这样读:

//path1,path0都对应文件而不是目录
val data=sc.textFile("path0,path1,path2,path3....path63")

那么读入的数据是按照路径的顺序生成对应partitionid的,读入的RDD的partition和路径对应规则为:

partition0:path0
partition1:path1
...
partition63:path63

不过别高兴太早,这么做可能得不到预想的结果,因为大部分的文件格式,是支持读入时进行split的,比如一个text类型文件有500MB,按照HDFS单个Block256MB的规则,会切分为256MB和246MB两个Partition,导致后边的partition又无法一一对应了,解决方案比较麻烦::

  - 用一种无法被split的文件格式或压缩方式进行存储;
  - 通过传参,或重写InputFormat,禁止数据块split;

解决好上述的问题后,那么我们可以这样分别读入两个表:

val leftRDD = sc.textFile("patha0,patha1,patha2,patha3....patha63")
val rightRDD = sc.textFile("pathb0,pathb1,pathb2,pathb3....pathb63")

3 BucketJoin-代码实现

示例选取了比较简单的情况,两个k-v表的Join:

RDD[(String, String)] + RDD[(String, String)] =  RDD[(String, Option[String], Option[String])]

3.1 BucketJoin-内存映射表实现

针对右表的数据可以放入内存的情况

def join(leftRDD: RDD[(String, String)], rightRDD: RDD[(String, String)]) {
  val joinedRDD: RDD[(String, Option[String], Option[String])] = leftRDD
    .zipPartitions(rightRDD) { (leftIter, rightIter) =>
      val rightMemMap = rightIter.toMap
      leftIter.map { case (k, v) => (k, Some(v), rightMemMap.get(k)) }
    }
}

3.2 BucketJoin-内存双流迭代比较实现

针对右表的数据无法放入内存的情况

def join(leftRDD: RDD[(String, String)], rightRDD: RDD[(String, String)]) {
  val joinedRDD: RDD[(String, Option[String], Option[String])] = leftRDD.zipPartitions(rightRDD) { (leftIter, rightIter) =>
    new Iterator[(String, Option[String], Option[String])]() {
      private var currentLeftData: (String, String) = _
      private var currentRightData: (String, String) = _

      override def hasNext: Boolean = {
        if (currentLeftData == null && leftIter.hasNext) currentLeftData = leftIter.next()
        if (currentRightData == null && rightIter.hasNext) currentRightData = rightIter.next()
        currentLeftData != null || currentRightData != null
      }

      override def next(): (String, Option[String], Option[String]) = {
        assert(hasNext)
        if (currentRightData.eq(null)) {
          leftOnly
        } else if (currentLeftData.eq(null)) {
          rightOnly
        } else {
          val compare = currentLeftData._1.compareTo(currentRightData._1)
          if (compare == 0) leftAndRight
          else if (compare < 0) leftOnly
          else rightOnly
        }
      }

      private def leftAndRight(): (String, Option[String], Option[String]) = {
        val currentLeft = currentLeftData
        val currentRight = currentRightData
        currentLeftData = null
        currentRightData = null
        (currentLeft._1, Some(currentLeft._2), Some(currentRight._2))
      }

      private def rightOnly(): (String, Option[String], Option[String]) = {
        val current: (String, String) = currentRightData
        currentRightData = null
        (current._1, Option.empty[String], Some(current._2))
      }

      private def leftOnly(): (String, Option[String], Option[String]) = {
        val current: (String, String) = currentLeftData
        currentLeftData = null
        (current._1, Some(current._2), Option.empty[String])
      }

    }
  }
}

上述就是总体的实现啦,目前的话,只有RDD是有zipPartitions算子的,所以想利用该加速只能在这一步的时候转换为RDD来进行计算。

4 注意事项

zipPartitions的preservesPartitioning参数默认为false,建议设置为true,这样就可以利用左表的Partition特性进行数据本地化的Task分配,配合一些数据本地化的参数调优,理想情况下,只需要较小的右表进行机器的读取,与做Shuffle的多次排序+多次落盘+多次序列化范序列化,不知道要快到哪。

如果需要读入后进行分片内排序,目前只能使用DateFrame.sortInPartitions,不过它是基于unsafeExternalSort进行排序的,要比基于Java对象排序快很多。

结语

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342

推荐阅读更多精彩内容