由前文我们知道,StreamGraph 表示一个流任务的逻辑拓扑,可以用一个 DAG 来表示(代码实现上没有一个 DAG 结构),DAG 的顶点是 StreamNode,边是 StreamEdge,边包含了由哪个 StreamNode 依赖哪个 StreamNode。本文我们主要介绍一个 StreamGraph 是如何转换成一个 JobGraph。
一、JobGraph 概述
- JobGraph 将会在原来的基础上做相应的优化(主要是算子的 Chain 操作,Chain 在一起的算子将会在同一个 task 上运行,会极大减少 shuffle 的开销)
- JobGraph 用来由 JobClient 提交给 JobManager,是由顶点(
JobVertex
)、中间结果(IntermediateDataSet
)和边(JobEdge
)组成的 DAG 图 - JobGraph 定义作业级别的配置,而每个顶点和中间结果定义具体操作和中间数据的设置
为什么要有 StreamGraph 和 JobGraph 两层的 Graph,最主要的原因是为兼容 batch process,Streaming process 最初产生的是 StreamGraph,而 batch process 产生的则是 OptimizedPlan,但是它们最后都会转换为 JobGraph
1.1、JobVertex
JobVertex 相当于是 JobGraph 的顶点,跟 StreamNode 的区别是,它是 Operator Chain 之后的顶点,会包含多个 StreamNode。主要成员:
-
List<OperatorIDPair> operatorIDs
:该 job 节点包含的所有 operator ids,以深度优先方式存储 ids -
ArrayList<JobEdge> inputs
:带输入数据的边列表 -
ArrayList<IntermediateDataSet> results
:job 节点计算出的中间结果
1.2、IntermediateDataSet
它是由一个 Operator(可能是 source,也可能是某个中间算子)产生的一个中间数据集。中间数据集可能会被其他 operators 读取,物化或丢弃。主要成员:
-
JobVertex producer
:该中间结果的生产者 -
List<JobEdge> consumers
:该中间结果消费边,通过消费边指向消费的节点 -
ResultPartitionType resultType
:中间结果的分区类型- 流水线的(有界的或无界的):一旦产生数据就向下游发送,可能是逐个发送的,有界或无界的记录流。
- 阻塞:仅在生成完整结果时向下游发送数据。
1.3、JobEdge
它相当于是 JobGraph 中的边(连接通道),这个边连接的是一个 IntermediateDataSet 跟一个要消费的 JobVertex。主要成员:
-
IntermediateDataSet sourc
:边的源 -
JobVertex target
:边的目标 -
DistributionPattern distributionPattern
:决定了在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式-
ALL_TO_ALL
:每个生产子任务都连接到消费任务的每个子任务 -
POINTWISE
:每个生产子任务都连接到使用任务的一个或多个子任务
-
二、Create Job Graph 主要流程
2.1、核心步骤
2.2、setChaining
从 Source StreamNode 实例开始设置 task chain,它将会递归地创建所有的 JobVertex 实例
这个方法首先从会遍历这个 StreamGraph 的所有 source 节点,然后选择从 source 节点开始执行 createChain()
方法,在具体的实现里,主要逻辑如下
总结下这个流程:
- 从输入节点开始,判断边的输出节点能否加入到该 chain
- 如果可以,则继续从输出节点执行扩展该 chain
- 否则,当前 chain 结束,以输出节点为初始节点,递归创建新的 chain
- 如果当前节点为 chain 的首节点,那么就创建一个 JobVertex,否则创建 StreamConfig,记录到 chainedConfigs(由于调用链上后面的节点先创建,因此创建首节点的 JobVertex 时,就可以使用 chainedConfigs 记录的信息了)
其中 JobEdge 是通过下游 JobVertex 的 connectNewDataSetAsInput
方法来创建的,在创建 JobEdge 之前,会先用上游 JobVertex 创建一个 IntermediateDataSet 实例,用来作为上游 JobVertex 的结果输出,然后作为 JobEdge 的输入,构建JobEdge实例,具体实现如下:
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType) {
/** 创建输入JobVertex的输出数据集合 */
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
/** 构建 JobEdge 实例 */
JobEdge edge = new JobEdge(dataSet, this, distPattern);
/** 将 JobEdge 实例,作为当前 JobVertex 的输入 */
this.inputs.add(edge);
/** 设置中间结果集合 dataSet 的消费者是上面创建的 JobEdge */
dataSet.addConsumer(edge);
return edge;
}
通过上述的构建过程,就可以实现上下游 JobVertex 的连接,上游 JobVertex ——> 中间结果集合 IntermediateDataSet ——> JobEdge ——> 下游 JobVertex
。其中:
- IntermediateDataSet 和 JobEdge 是用来建立上下游 JobVertex 之间连接的配置
- 一个 IntermediateDataSet 有一个 producer,可以有多个消费者 JobEdge
- 一个 JobEdge 则有一个数据源 IntermediateDataSet,一个目标JobVertex
- 一个 JobVertex 可以产生多个输出 IntermediateDataSet,也可以接受来自多个 JobEdge 的数据
2.3、算子 Chainable 的依据
isChainable()
的判断依据如下:
return downStreamVertex.getInEdges().size() == 1 //
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) // 对应的 slotSharingGroup 一样
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS // out operator 允许 chain 操作
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || // head Operator 允许跟后面的 chain 在一起
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner) // partitioner 是 ForwardPartitioner 类型
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism() // 并发相等
&& streamGraph.isChainingEnabled(); // StreamGraph 允许 Chain 在一起
2.3.1、slotSharingGroup
一个 StreamNode 的 SlotSharingGroup 会按照下面这个逻辑来确定:
- 如果用户指定了 SlotSharingGroup,直接使用这个 SlotSharingGroup name;
- 如果所有的 input 都是同一个 group name,使用这个即可;
- 否则使用 default group;
2.3.2、edge.getPartitioner()
StreamPartitioner 的实现
用户可以在自己的代码中调用 DataStream API (比如:broadcast()
、shuffle()
等)配置相应的 StreamPartitioner,如果这个没有指定 StreamPartitioner 的话,则会走下面的逻辑创建默认的 StreamPartitioner:
//org.apache.flink.streaming.api.graph.StreamGraph
//note: 未指定 partitioner 的话,会为其选择 forward(并发设置相同时) 或 rebalance(并发设置不同时)
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}