上周在调优一个job ,发现spark history 出现一个重复出现的job , 这个job 写着Listing leaf files and directorioes for 55 paths ...... job 190 到job 197 全是一样的job. 这8个job 花了71 min.很奇怪。当然完成8个job 要花71min 也正常,executor 调度是要时间的。 不正常的是为什么要重复做一样的job .根据线索去查源码"Listing leaf files and directorioes for". 在https://github.com/apache/spark/blob/v2.3.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L199。 这段代码的注释说,这个函数的功能是循环遍历一组路径下的文件。会选择相应的策略来遍历路径。继续往下看,知道了他所谓的策略就是如果一组路径的数量 大于sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold( default value 32) ,就会开启一个job 去遍历所有partitions. 用spark job 去遍历路径下的文件至少是并发的,性能肯定要好点。
问题的是为什么要重复地去读路径下的文件呢,直到我在log 里面发现了这句话。
Evicting cached table partition metadata from memory due to size constraints
(spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance
大概的意思是table partition metadata 已经超过250M, 超过spark.sql.hive.filesourcePartitionFileCacheSize阈值了。会影响性能。根据线索『Evicting cached table partition metadata from memory due to size constraints 』去查询源码https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala#L126。这个函数会把表的partition 信息放到memory 里面。
到这里,我这里能猜出来就是这张表的partition 信息已经超过250M, 放不进内存,然后需要一次次地去遍历partition paths 获取partition 的信息例如文件大小,partition的字段信息。
然后我去查了下这张表partitions的信息。 一个partition 3g,居然被分成了3409 分片 , 平均一个分片文件1m.
解决方法: repartition,将3g 重新repartition 13g 文件,这样一个分片229M.
经过数据清洗之后(你看到的是这8个字,我大概洗了8个小时都不止了)。我重新跑了这个job ,只出现了一次『Listing leaf files and directorioes for 55 paths 』 。 当第一次遍历完这个表的partition 信息后, spark 就直接信息放内存里面了。不会再反复起job读文件了。 执行时间当然大幅度减少。
我当时在心里问了2个问题
1. 为什么spark 要收集partition 信息,收集了这些信息有什么用处?
spark 有些query 要做优化的时候,需要partition信息,where dt>'2020-01-01' and dt<'2020-02-01' 像partition 上的谓词下推是需要收集partition 的信息的。
2. 为什么spark 不问 hive metastore 查询去拿partition 信息呢,如果是其他格式例如 sequence file(这张hive table 是parquet 格式) ,是不是没有这个问题?
spark里面不是所有的datasource 都是hive。除了hive ,还有json, parquet, orc 其他格式,为了搜集这些数据格式的信息。spark 自己写了一个catalog. 但是无论是什么格式,如果一直有小文件的问题,那么问题也是一定存在的。