上周调优一个job 的时候发现一个join 意外的耗时间,如图上一个join的shuffle操作就耗时1.2h. Input 才91GB, shuffle write 525.5GB. 但是花了1.2h.
看看里面的task , Median 就是42min, max 1.2h . 虽然明显是有拖尾的现象,但是Median就42min ,不是skew 造成的。 Median 的输入是203M,Shuffle write 1171MB, Shuffle Spill(Memory) 14.6G, Shuffle Spill(Disk) 2.2GB.看上去是executor.memory 不够才造成的shuffle的spill 耗时间。
解决思路:1. 把单个input 变小 2. 调大executor memory . 这次用第一个没有用第二个的原因是这个application 有很多query,如果调大executor memory, 那么其他query 事实上不需要那么大的memory也会跟着一起用较大的executor memory setting.
首先尝试了spark.sql.files.maxPartitionBytes=33554432 , 把单个partition最大读数据量控制在32M。但是执行时间依然是这样,从history server 上看每个task 的输入数据还是200MB ,没有变化。
val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf
.createWithDefault(128 * 1024 * 1024) // parquet.block.size
重新去看源码,源码:
每个task接受的input 的确可以通过spark.sql.files.maxPartitionBytes 来控制,但是对于文件格式是有要求的,fsRelation.fileFormat.isSplitable为true才能根据参数分割输入,isSplittable的源码是这样:输入是否能分割和文件格式text,parquet,orc,json没有关系,只和文件格式对应的压缩算法(下面的codec)有关系。
abstract class TextBasedFileFormat extends FileFormat {
private var codecFactory: CompressionCodecFactory = _
override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = {
if (codecFactory == null) {
codecFactory = new CompressionCodecFactory(
sparkSession.sessionState.newHadoopConfWithOptions(options))
}
val codec = codecFactory.getCodec(path)
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
}
}
常见的压缩算法是GZIP,LZO,SNAPPY, GZIP是不能分割的,也就是说一个文件(gz结尾)1GB,怎么设置参数,每个task的输入还是1GB.
看到这里可以写个spark单元测试来验证一下这个spark.sql.files.maxPartitionBytes,先设置成2 , 这个单元测试的用意是看在读数据的时候,spark是否把每个task的输入控制在2byte. 首先在/tmp/seq 下写了一份数据。看了下的确产生了12个partition. 然后读数据的时候看到底用了多少partitions. 看了下最后用了147个分片来读数据。说明这个参数是有效果的。
override def conf: SparkConf = {
val conf = super.conf
conf.set("spark.sql.files.maxPartitionBytes","2")
conf
}
test("test spark.sql.files.maxPartitionBytes "){
val df1 = spark.range(0, 100).selectExpr("CAST(id AS STRING) AS s")
//original 12partitions
df1.write.mode("overwrite").format("text").text("/tmp/seq")
val df2 = spark.read.format("text")load("/tmp/seq")
println("size:"+getFileScanRDD(df2).filePartitions.size) // 147 partitions
}
重新写个单元测试,写数据的时候用GZIP 压缩下, 看看读数据的时候能否还是用147个partition去读,跑完你就可以发现原来写的时候写了12个partition,现在读文件还是12个partition. 这个时候这个参数根本没有用
override def conf: SparkConf = {
val conf = super.conf
conf.set("spark.sql.files.maxPartitionBytes","2")
conf
}
test("test spark.sql.files.maxPartitionBytes "){
val df3 = spark.range(0, 100).selectExpr("CAST(id AS STRING) AS s")
//original 12partitions
df3.write.option("compression", "gzip").mode("overwrite").format("text").text("/tmp/compressed")
val df4 = spark.read.format("text")load("/tmp/compressed")
println("size:"+getFileScanRDD(df4).filePartitions.size) //size:12
}
回头再看最前面的那个问题,原来之所以参数设置了不起效果,就是因为原来的表的格式是SequenceFile且它的压缩格式是GZIP。解决方法重新生成表让它不带压缩的格式或者你选个可以分割的压缩算法就能让后面读数据的时候spark.sql.files.maxPartitionBytes 生效了。来看看效果 Median 34s 即使task的个数从原来的459个变成了3655个,每个task 的输入从原来的200M变成了32M, 减小了6倍,task个数变成了原来的7倍(的确没有压缩的话表的确是在空间上是变大了一点)。 但是每个task的执行时间从原来的42min变成了31s. job 的执行时间肯定是大幅度下降( job 跑得快快的)
总结一下整个文章想表达的意思
1. task 输入太大会对shuffle造成spill 到disk 的额外耗时(当然你的executor memory 如果足够大是不会出现这个问题的),请注意这个case 的输入才200M,也不大,但是对shuffle 也造成要spill到disk 的压力。原因就是输入是使用了压缩算法(GZIP), 200M的输入放到内存可以预计扩大到3~5倍(见下面每个压缩算法的压缩率),也就是600M~1G.
2. 可以通过设置spark.sql.files.maxPartitionBytes 来分割每个task 的输入。 但是配合不同的压缩算法,压缩算法是否可以被分割又决定了输入是否可以被分割。
3. 如何看你的输入是使用了什么分割算法,看分片文件的后缀。下面这个分片是GZIP格式。
part-00000-76a8013e-8a5e-4c7d-8ae6-09368920561b-c000.txt.gz。
4. 总结一下各个压缩算法的压缩率和是否可以被分割,从下图可以看到 GZIP的压缩率的确是最高的,但是GZIP是不可以分割的。