问题排查
- 看反压:背压高的下游oprator就是瓶颈
- 关注checkpoint时长
checkpoint时长一定程度影响系统吞吐 - 看核心指标
延迟指标、吞吐等 - 资源使用率
常见问题
- json序列化反序列化
通常在source和sink的task上,在指标上没有体现,容易被忽略(取消operator chain,查看反压) - 数据倾斜
数据倾斜影响系统吞吐 - 频繁GC
内存比例分配不合理导致频繁gc,影响吞吐甚至tm失联 - MAP和SET的hash冲突
有hashmap和hashset 随着负载因子的增高,引起插入和查询性能下降 - 和低速的系统交互
和低速的外部系统交互,mysql、hbase (增加应用缓存、积攒批次处理避免单条查询) - 大窗口
1)窗口size大、数据量大
2)滑动窗口size和step步长比例较大 eg size=5min,step=1s 同一条数据查法很多窗口的计算
数据倾斜影响
- 数据热点:数据集中在某些task中,数据不平衡
- GC频繁:过多的数据在jvm,导致内存资源短缺,触发频繁gc
- 吞吐下降,数据延迟增大
- 系统崩溃 过长的gc会导致tm和jm失联,系统崩溃
数据倾斜
1.源头
数据源消费不均匀,调整并行度(eg kakfa 分区 3个kafka分区 两个并行度 导致其中一个task处理2个kafka分区 另一个处理一个分区)
解决办法:通常source的并法度是kafka分区的整数倍
2.聚合场景
解决办法:两段聚合的方式(局部聚合+全局聚合)
方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。
方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数(注意:随机数范为选择为下游并发度),比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。
内存调优
Flink中的总内存由JVM堆、manager memory和network buffers构成
对于容器化部署,总内存还可以包括一个容器预留内存
所有其他内存组件都是在启动Flink进程之前从total memory中计算出来的。启动后,manager memory和network buffers在某些情况下根据进程内可用的JVM内存进行调整
total memory 由taskmanager.heap.size指定
在Yarn or Mesos部署时,是请求容器大小
Container cut-off
引入截断是为了适应其他类型的内存消耗,而这些消耗在这个内存模型中没有考虑,例如RocksDB本机内存、JVM开销等。它也是一个安全余量,以防止容器超出其内存限制并被容器管理器杀死。
containerized.heap-cutoff-ratio :默认为0.25(占总内存的)
堆外内存调优
NetworkBuffer
taskmanager.network.memory.fraction:默认是0.1
taskmanager.network.memory.min:默认是64M
taskmanager.network.memory.max: 默认是1G
一般taskmanager.network.memory.fraction是0.1或小于0.1 根据使用情况调整ManagerBuffer :
taskmanager.memory.off-heap:true(默认是false不开启)
taskmanager.memory.fraction(默认是0.7)
考虑到流计算过程中ManagerBuffer没有使用,可以taskmanager.memory.fraction调整小于0.3
堆内内存调优
flink运行在jvm上,Flink使用的 Parallel Scavenge垃圾回收器可以改为G1
env.java.opts= -server -XX:+UseG1GC - XX:MaxGCPauseMillis=300 -XX:+PrintGCDetails
内存模型的例子
TM总内存8G, cutoff:容器的预留内存(k8s、yarn)为8G * 0.25
taskmanager.memory.fraction设置,例如
0.8的值意味着TM为内部数据缓冲区保留了80%的内存(堆内或堆外,取决于taskmanager.memory.off . heap),将20%的空闲内存留给TM的堆,供用户定义的函数创建的对象使用。此参数仅在taskmanager.memory未设置是生效。
managed = (total - network) x fraction
heap = total - managed (if off-heap) - network
network = Min(max, Max(min, fraction x total)
- 作业failover的常见原因
jobmanager
zk访问超时
长时间GC
资源问题
主机故障(磁盘等) - taskmanager
上下游异常
数据问题
runtime异常
主机异常
延迟问题处理
- 延迟与吞吐
确定延时节点及时间 - 反压分析
找到反压节点
指标分析
查看一段时间的指标
堆栈
找到指定节点jvm进程、分析jstack等信息
相关日志
查看taskmanager日志是否有异常
作业性能问题
- 延时与吞吐
延时指标
tps吞吐
节点输入输出 - 反压
找出反压源节点
节点连接方式 shuffle、rebanlance、hash
节点个并发情况
业务逻辑、是否有正则、外部系统访问等
作业性能问题
- 指标
gc时间
gc次数
state checkpoint性能
外部系统访问延时 -
堆栈
节点所在taskmanager进程
查看线程PID
常见处理方式
调整节点并发
性能瓶颈问题增加并发调整节点资源
增加节点内存 cpu资源拆分节点
将chain在一起消耗资源较多的operator分开,增加并发作业集群优化
主键设置 数据去重 数据倾斜
GC参数
jobmanager参数
堆栈
failover信息补全,需要到job mamaner 中看更详细的日志
- 建立指标系统
延迟和吞吐是flink最重要的指标
tps 每秒有多少数据进入系统 消费是否能跟上生产 - 如何查看日志
yarn的container 日志和查看jobmanamger taskmanager 的日志
参考
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
https://tech.meituan.com/2016/05/12/spark-tuning-pro.html
https://blog.csdn.net/nazeniwaresakini/article/details/104220120?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-8&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-8