1 前言
State要能发挥作用,就需要持久化到可靠存储中,flink中持久化的动作就是checkpointing,那么从TM中执行的Task的基类StreamTask的checkpoint逻辑说起。
2 源码解析
<1> StreamTask
类作用说明:
所有流任务的基础类,一个StreamTask是由TaskManagers部署并且运行的本地处理的集合,每一个StreamTask运行来自算子链上的一个或者多个StreamOperator。
链接在一起的算子在同一个线程,也在同一个流分区上,这些算子链就是连续的map/flatMap/filter任务。StreamTask的生命周期
(1)setInitialState:用来提供算子链上所有算子的状态
(2)invoke:
- 关于checkpoint方法
triggerCheckpoint方法:checkpoint协调器异步调用此方法以触发检查点。
performCheckpoint方法:
(1)如上所示,可以进行checkpoint操作。
Step1:准备checkpoint工作,允许算子进行一些pre-barrier工作。在通常情况下,pre-barrier工作应该为零或最少。
(通常是在算子输出其checkpoint barrier之前,需要进行快照时调用;此方法方法不应用于任何实际状态快照逻辑,因为它本质上将在操作员检查站的同步部分之内。 如果在此方法中完成繁重的工作,它将影响等待时间和下游检查点对齐。)
Step2:将checkpoint barrier发送到下游。
Step3:进行状态快照,这应该很大程度上是异步操作,以免影响流式拓扑的进度。
(2)不能进行广播操作,则将CancelCheckpointMarker广播出去
checkpointState方法:
(1)创建CheckpointStreamFactory(检查点输出流工厂,用于保留检查点的数据)
该接口的实现类或继承接口:
由CheckpointStorage实例调用resolveCheckpointStorageLocation方法获取工厂
(2)创建CheckpointingOperation实例,并执行executeCheckpointing方法,在该方法中,会遍历每一个算子,进行checkpointStreamOperator操作;
该操作就是对当前算子进行snapshotState操作。
下面开始进入StreamOperator的snapshotState方法。
<2> StreamOperator的snapshotState方法
StreamOperator源码简析
Apache Flink源码解析 (四)Stream Operator
算子调用该方法进行状态的快照操作,而该方法的基本实现是在AbstractStreamOperator中的snapshotState方法。
在该方法中,分别调用了OperatorStateBackend和KeyedStateBackend的snapshot方法。
特别注意,在调用这两个方法之前的snapshotState(snapshotContext)这个调用,它一方面实现了Raw的State的snapshot,一方面也实现了用户自定义的函数的State的更新。
snapshotState(snapshotContext)方法:
对于具有状态的流运算符,想要加入快照需要重写此hook方法。
什么是CheckpointStateOutputStream?
不同的StateBackEnd会有不同的实现,会返回不同的CheckpointStateOutputStream实现,比如FsStateBackEnd会构造文件流,而MemoryStateBackEnd就会构造ByteAttayOutputStream。
而CheckpointStateOutputStream会作为IO代理包含在KeyedStateCheckpointOutputStream 和 OperatorStateCheckpointOutputStream内。
KeyedStateCheckpointOutputStream 和 OperatorStateCheckpointOutputStream 分别需要记录额外的状态。KeyedStateCheckpointOutputStream 需要记录每个keyGroup起始在流中的位置, OperatorStateCheckpointOutputStream 需要记录每个partition起始在流中的位置, 这些信息都会体现在对应的StreamStateHandle中.
todo!!!!
operatorStateBackend和keyedStateBackend的snapshot方法:
将stateBackend备份到用户指定的文件系统。
snapshot方法是在接口SnapshotStrategy中定义,SnapshotStrategy的实现类如下: