问题描述
我们最近有个项目,需要实时消费订单成单的消息,提炼完数据后把结果写入HDFS,因此checkpointConfiguration 自然而然也采用默认配置写到HDFS中去(其实按照分布式集群模式的Spark,是必须配置到HDFS 兼容的目录)
项目采用Spark Structured Streaming 2.4,启用 window + watermark, 上游source 是Kafka,目前的watermark 是1分钟,抓取消息并没有采用triggerInterval,而是实时不断抓取,因此在水位没有到的时候一个Job 的平均duration 大概 1~3s,总共32个executors,也就是说期间将会有32个线程不断去HDFS写 checkpoint文件。
现在发现一个问题,就是:在平均1~3s 的作业当中,时不时会有耗时20s+ 的作业,查看结果并非是写结果导致,而是会有个别的executor 写checkpoint 到HDFS异常的慢,如下图
当我们开启了hadoop包路径的INFO日志级别时,我们会看到类似的日志
从日志上很清洗能看到,卡顿源于一个executor 在往HDFS 的tmp 目录写checkpoint 文件
checkpoint 机制科普
我们都知道,Spark 2.x 的checkpoint 机制是Spark 在运行过程中使用的一个中间状态保存的机制,在启用了诸如 window 功能时,是必须开启的,其归根的实现是Spark StateStore;
它主要实现了2中很重要的功能
- 它需要保存每次RDD转换时的中间状态数据,官方的名词叫
增量式持续查询的实现
简单描述就是一个流式处理,必须标注每个executor当前处理到哪里了?必须把这个中间的结果先保存了(根据不同的配置可能是一个 内存+file 的组合) - 每个executor当前完成作业的进度,比如一个executor 挂了,它需要从这个地方来恢复
在这里我打算详细介绍Spark 的StateStore了,关于它的设计机制和接口,我推荐大家阅读这篇介绍
Structured Streaming 之状态存储解析
那好,我们现在只需要简单知道,Spark 在处理流式程序的时候,它是需要有个地方来写这个checkpoint 信息的,并且是每个executor并发来写,那我们现在来研究一下它的文件长什么样,我们回到Spark 的官方文档,去看看checkpointing章节
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
从官方文档介绍看,一类文件名叫 metadata,主要是记录了一些 physical plan的一些信息,字面理解就是万一executor 挂了,它重启后需要从一个地方恢复 driver需要它干些什么活,而另外一类就是 delta 文件,这就是上面说的每次job 的中间RDD 的中间结果数据。
问题分析
那我们可以直接去HDFS上看看我们写的这些 文件
题外话这里再科普一下 snapshot 和delta的区别,spark会每个job每个executor都写一个delta,而到一定个数个delta就会汇总成一个snapshot,恢复的时候会先找snapshot,再汇聚这个snapshot之后的delta,这样来恢复一个现场
从这个图上可以看到目前我们的程序的几个特点
- 现在每个executor 写的数据其实很小,最大不过几十K,但是HDFS block 是128M,这样32个线程算下来,小文件是挺多的
- 写入比较频繁,平均每3s就写一次了
其实对于一个繁忙的大HDFS集群来说,这两点都不是一个好事情
解决的一些思路
虽然我目前并无实质解决这个抖动问题,但是从分析来看我们会有一些思路,也在这里分享出来一起讨论:
- 首先,我觉得这应该不是写入文件的瓶颈,因为就算如何频繁,而且除了我们业务,还有非常多的业务同样有写入,如果我们写入都瓶颈那么整个集群早就到瓶颈了。那我先猜测卡顿应该是发生在 name node 的RPC 调用,是否在同一个 hdfs client 下对于并发调用是会有一定的资源竞争,那么我尝试去调大下面的参数
'dfs.namenode.handler.count': '64'
'dfs.datanode.handler.count': '64'
发现效果不明显
我们的业务其实对数据的
有且仅有一次消费保障
要求不明显,因为我们清洗电商平台的成单消息只是对里面的一些收益做一些粗略预算,就算中间挂掉机器,这部分数据我们是可以忽略,况且这种情况风险还是非常非常低,对比的话我们希望的是去掉这种写HDFS的延迟而尽可能加大消费的速度,所以第二个思路我们有想过写入宿主机mount的本地目录(我们的spark 已经上了k8s集群);这个方案我觉得在对数据不敏感的流式处理场景应该效率还是蛮高的,比如我们一个宿主机大概可以处理超过一般的executors,当executor 挂掉的时候,就算分配不到同样的宿主机,应该也是有机制可以获取到mount的目录(我对k8s了解不深,这只是我的猜测)最坏的结果是我不需要这批次的数据了。把整个checkpoint 机制写到外部存储去,其实这样对整个计算环境也是有好处的,比如节省大量的内存,加个写的速度等等,我永远相信,我肯定不是第一个吐糟这个机制的人,总有人比你吐糟的早,解决的早,因此还是有不少的人开源了一些实现机制,比如下面兄弟实现了写入 rocketDB的一个方案
- 终极大招,自己去实现
--conf spark.sql.streaming.stateStore.providerClass
来覆盖Spark的默认实现org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
上面的第3个点其实就是自己实现了一套
--conf spark.sql.streaming.stateStore.providerClass="ru.chermenin.spark.sql.execution.streaming.state.RocksDbStateStoreProvider"
只要照着这个来做一个,在spark启动是指定这个实现,即可搞一套完全自定义的checkpoint 机制,里面的具体接口大家可以直接看源代码,或者参考我上面的那篇文章,其实写的非常详细
好了,目前我只想到这几种思路,如果大家也有遇到同样的问题,或者你已经有更好的办法的,欢迎推荐。