偶然读取到了字节跳动关于Spark做的一些优化,发现其中一项被称为BuckedtJoin的优化项
传送门:Spark SQL 在字节跳动数据仓库领域的优化实践
而我曾经也实现过一个类似的解决方案,现在才知道这种方案有一个专业的名词BucketJoin。此篇我们来介绍以下,在不进行Spark源码修改的情况下,怎么实现BucketJoin
背景
Spark提供的Join方式主要有HashJoin、BroadcastJoin、SortMergeJoin;对应的使用条件如下:
- BroadcastJoin:其中一个表非常小,可以被完整放入内存中;
- HashJoin:两个表都很大,但其中一个表在进行Shuffle之后,在Reduce端的单partition数据能被加载导内存中;
- SortMergeJoin:两个表都很大,且在Reduce端无法被加载导内存,只能通过两个有序迭代流进行命中比较;
它们的实现原理不再赘述,性能表现为:
BroadcastJoin > HashJoin > SortMergeJoin
BucketJoin 适用场景
有这样一种情况,Table 1 与 Table 2 内的数据已经按照相同的 Key 进行分桶且桶数都为 n,同时桶内按该 Key 排序。对这两张表进行 Join 时,可以避免 Shuffle,直接启动 n 个 Task 进行 Join,我们称之为BucketJoin。
性能表现为
BroadcastJoin > BucketJoin > HashJoin > SortMergeJoin
1 BucketJoin 实现思路
解决思路:
BucketJoin在实现的过程中,根据数据量的不同,在拿到两个相同分区之后,可以有以下两种Join解决方案:
- right表可以放在内存中:映射为HashMap,进行命中查询即可;
- right表无法放在内存中:保证左右表有序,进行迭代流命中比较(类似SortMergeJoin在Reduce端的实现方案),如何保证有序,两种解法:
2.1 数据生成时进行排序;
2.2 读入数据后调用DataFrame.sortInPartition进行分区内排序;
2 基于Spark-RDD的实现方案
Spark有一个算子zipPartitions可以帮我们实现这个方案
//preservesPartitioning:是否适用left表的Partitionner来执行计算
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B],
preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
这个算子执行的前提是两个RDD的分区数一致,计算时会将相同partitionId的数据放在同一个Task进行计算,计算规则就是传入的f:针对两个迭代器进行处理的匿名函数。
但是有一个问题,正常读入数据,Spark是不能保证HDFS上的数据和Partition的Id一致的,比如一个目录下的part-00000文件,读入后的partitionId就可能不是0,导致两个RDD的partition不能一一对应。怎么保证读入的partition能够按照我设想的情况,在HDFS上和RDD的partition是一致的呢?
我们知道,sc.textFile等读数据方式,是可以按照路径的通配符,或者多个路径进行数据读入的。如果直接读取父目录,或者通配符,就不能保证数据分片和RDDpartition分片号对应的。但是如果这样读:
//path1,path0都对应文件而不是目录
val data=sc.textFile("path0,path1,path2,path3....path63")
那么读入的数据是按照路径的顺序生成对应partitionid的,读入的RDD的partition和路径对应规则为:
partition0:path0
partition1:path1
...
partition63:path63
不过别高兴太早,这么做可能得不到预想的结果,因为大部分的文件格式,是支持读入时进行split的,比如一个text类型文件有500MB,按照HDFS单个Block256MB的规则,会切分为256MB和246MB两个Partition,导致后边的partition又无法一一对应了,解决方案比较麻烦::
- 用一种无法被split的文件格式或压缩方式进行存储;
- 通过传参,或重写InputFormat,禁止数据块split;
解决好上述的问题后,那么我们可以这样分别读入两个表:
val leftRDD = sc.textFile("patha0,patha1,patha2,patha3....patha63")
val rightRDD = sc.textFile("pathb0,pathb1,pathb2,pathb3....pathb63")
3 BucketJoin-代码实现
示例选取了比较简单的情况,两个k-v表的Join:
RDD[(String, String)] + RDD[(String, String)] = RDD[(String, Option[String], Option[String])]
3.1 BucketJoin-内存映射表实现
针对右表的数据可以放入内存的情况
def join(leftRDD: RDD[(String, String)], rightRDD: RDD[(String, String)]) {
val joinedRDD: RDD[(String, Option[String], Option[String])] = leftRDD
.zipPartitions(rightRDD) { (leftIter, rightIter) =>
val rightMemMap = rightIter.toMap
leftIter.map { case (k, v) => (k, Some(v), rightMemMap.get(k)) }
}
}
3.2 BucketJoin-内存双流迭代比较实现
针对右表的数据无法放入内存的情况
def join(leftRDD: RDD[(String, String)], rightRDD: RDD[(String, String)]) {
val joinedRDD: RDD[(String, Option[String], Option[String])] = leftRDD.zipPartitions(rightRDD) { (leftIter, rightIter) =>
new Iterator[(String, Option[String], Option[String])]() {
private var currentLeftData: (String, String) = _
private var currentRightData: (String, String) = _
override def hasNext: Boolean = {
if (currentLeftData == null && leftIter.hasNext) currentLeftData = leftIter.next()
if (currentRightData == null && rightIter.hasNext) currentRightData = rightIter.next()
currentLeftData != null || currentRightData != null
}
override def next(): (String, Option[String], Option[String]) = {
assert(hasNext)
if (currentRightData.eq(null)) {
leftOnly
} else if (currentLeftData.eq(null)) {
rightOnly
} else {
val compare = currentLeftData._1.compareTo(currentRightData._1)
if (compare == 0) leftAndRight
else if (compare < 0) leftOnly
else rightOnly
}
}
private def leftAndRight(): (String, Option[String], Option[String]) = {
val currentLeft = currentLeftData
val currentRight = currentRightData
currentLeftData = null
currentRightData = null
(currentLeft._1, Some(currentLeft._2), Some(currentRight._2))
}
private def rightOnly(): (String, Option[String], Option[String]) = {
val current: (String, String) = currentRightData
currentRightData = null
(current._1, Option.empty[String], Some(current._2))
}
private def leftOnly(): (String, Option[String], Option[String]) = {
val current: (String, String) = currentLeftData
currentLeftData = null
(current._1, Some(current._2), Option.empty[String])
}
}
}
}
上述就是总体的实现啦,目前的话,只有RDD是有zipPartitions算子的,所以想利用该加速只能在这一步的时候转换为RDD来进行计算。
4 注意事项
zipPartitions的preservesPartitioning参数默认为false,建议设置为true,这样就可以利用左表的Partition特性进行数据本地化的Task分配,配合一些数据本地化的参数调优,理想情况下,只需要较小的右表进行机器的读取,与做Shuffle的多次排序+多次落盘+多次序列化范序列化,不知道要快到哪。
如果需要读入后进行分片内排序,目前只能使用DateFrame.sortInPartitions,不过它是基于unsafeExternalSort进行排序的,要比基于Java对象排序快很多。