Spark Structured Streaming 写checkpoint 到HDFS 抖动的排查


问题描述

我们最近有个项目,需要实时消费订单成单的消息,提炼完数据后把结果写入HDFS,因此checkpointConfiguration 自然而然也采用默认配置写到HDFS中去(其实按照分布式集群模式的Spark,是必须配置到HDFS 兼容的目录)

项目采用Spark Structured Streaming 2.4,启用 window + watermark, 上游source 是Kafka,目前的watermark 是1分钟,抓取消息并没有采用triggerInterval,而是实时不断抓取,因此在水位没有到的时候一个Job 的平均duration 大概 1~3s,总共32个executors,也就是说期间将会有32个线程不断去HDFS写 checkpoint文件。

现在发现一个问题,就是:在平均1~3s 的作业当中,时不时会有耗时20s+ 的作业,查看结果并非是写结果导致,而是会有个别的executor 写checkpoint 到HDFS异常的慢,如下图


当我们开启了hadoop包路径的INFO日志级别时,我们会看到类似的日志



从日志上很清洗能看到,卡顿源于一个executor 在往HDFS 的tmp 目录写checkpoint 文件


checkpoint 机制科普

我们都知道,Spark 2.x 的checkpoint 机制是Spark 在运行过程中使用的一个中间状态保存的机制,在启用了诸如 window 功能时,是必须开启的,其归根的实现是Spark StateStore;
它主要实现了2中很重要的功能

  1. 它需要保存每次RDD转换时的中间状态数据,官方的名词叫增量式持续查询的实现 简单描述就是一个流式处理,必须标注每个executor当前处理到哪里了?必须把这个中间的结果先保存了(根据不同的配置可能是一个 内存+file 的组合)
  2. 每个executor当前完成作业的进度,比如一个executor 挂了,它需要从这个地方来恢复

在这里我打算详细介绍Spark 的StateStore了,关于它的设计机制和接口,我推荐大家阅读这篇介绍
Structured Streaming 之状态存储解析

那好,我们现在只需要简单知道,Spark 在处理流式程序的时候,它是需要有个地方来写这个checkpoint 信息的,并且是每个executor并发来写,那我们现在来研究一下它的文件长什么样,我们回到Spark 的官方文档,去看看checkpointing章节

http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

从官方文档介绍看,一类文件名叫 metadata,主要是记录了一些 physical plan的一些信息,字面理解就是万一executor 挂了,它重启后需要从一个地方恢复 driver需要它干些什么活,而另外一类就是 delta 文件,这就是上面说的每次job 的中间RDD 的中间结果数据。


问题分析

那我们可以直接去HDFS上看看我们写的这些 文件


题外话这里再科普一下 snapshot 和delta的区别,spark会每个job每个executor都写一个delta,而到一定个数个delta就会汇总成一个snapshot,恢复的时候会先找snapshot,再汇聚这个snapshot之后的delta,这样来恢复一个现场

StateStore in Apache Spark Structured Streaming

从这个图上可以看到目前我们的程序的几个特点

  1. 现在每个executor 写的数据其实很小,最大不过几十K,但是HDFS block 是128M,这样32个线程算下来,小文件是挺多的
  2. 写入比较频繁,平均每3s就写一次了

其实对于一个繁忙的大HDFS集群来说,这两点都不是一个好事情


解决的一些思路

虽然我目前并无实质解决这个抖动问题,但是从分析来看我们会有一些思路,也在这里分享出来一起讨论:

  1. 首先,我觉得这应该不是写入文件的瓶颈,因为就算如何频繁,而且除了我们业务,还有非常多的业务同样有写入,如果我们写入都瓶颈那么整个集群早就到瓶颈了。那我先猜测卡顿应该是发生在 name node 的RPC 调用,是否在同一个 hdfs client 下对于并发调用是会有一定的资源竞争,那么我尝试去调大下面的参数
  'dfs.namenode.handler.count': '64'
  'dfs.datanode.handler.count': '64'

发现效果不明显

  1. 我们的业务其实对数据的有且仅有一次消费保障要求不明显,因为我们清洗电商平台的成单消息只是对里面的一些收益做一些粗略预算,就算中间挂掉机器,这部分数据我们是可以忽略,况且这种情况风险还是非常非常低,对比的话我们希望的是去掉这种写HDFS的延迟而尽可能加大消费的速度,所以第二个思路我们有想过写入宿主机mount的本地目录(我们的spark 已经上了k8s集群);这个方案我觉得在对数据不敏感的流式处理场景应该效率还是蛮高的,比如我们一个宿主机大概可以处理超过一般的executors,当executor 挂掉的时候,就算分配不到同样的宿主机,应该也是有机制可以获取到mount的目录(我对k8s了解不深,这只是我的猜测)最坏的结果是我不需要这批次的数据了。

  2. 把整个checkpoint 机制写到外部存储去,其实这样对整个计算环境也是有好处的,比如节省大量的内存,加个写的速度等等,我永远相信,我肯定不是第一个吐糟这个机制的人,总有人比你吐糟的早,解决的早,因此还是有不少的人开源了一些实现机制,比如下面兄弟实现了写入 rocketDB的一个方案

Custom state store providers for Apache Spark

  1. 终极大招,自己去实现--conf spark.sql.streaming.stateStore.providerClass 来覆盖Spark的默认实现org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
    上面的第3个点其实就是自己实现了一套
--conf spark.sql.streaming.stateStore.providerClass="ru.chermenin.spark.sql.execution.streaming.state.RocksDbStateStoreProvider"

只要照着这个来做一个,在spark启动是指定这个实现,即可搞一套完全自定义的checkpoint 机制,里面的具体接口大家可以直接看源代码,或者参考我上面的那篇文章,其实写的非常详细

StateStore in Apache Spark Structured Streaming


好了,目前我只想到这几种思路,如果大家也有遇到同样的问题,或者你已经有更好的办法的,欢迎推荐。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容