Flink 作业生成②:StreamGraph -> JobGraph

由前文我们知道,StreamGraph 表示一个流任务的逻辑拓扑,可以用一个 DAG 来表示(代码实现上没有一个 DAG 结构),DAG 的顶点是 StreamNode,边是 StreamEdge,边包含了由哪个 StreamNode 依赖哪个 StreamNode。本文我们主要介绍一个 StreamGraph 是如何转换成一个 JobGraph。

一、JobGraph 概述

  • JobGraph 将会在原来的基础上做相应的优化(主要是算子的 Chain 操作,Chain 在一起的算子将会在同一个 task 上运行,会极大减少 shuffle 的开销)
  • JobGraph 用来由 JobClient 提交给 JobManager,是由顶点(JobVertex)、中间结果(IntermediateDataSet)和边(JobEdge)组成的 DAG 图
  • JobGraph 定义作业级别的配置,而每个顶点和中间结果定义具体操作和中间数据的设置

为什么要有 StreamGraph 和 JobGraph 两层的 Graph,最主要的原因是为兼容 batch process,Streaming process 最初产生的是 StreamGraph,而 batch process 产生的则是 OptimizedPlan,但是它们最后都会转换为 JobGraph

1.1、JobVertex

JobVertex 相当于是 JobGraph 的顶点,跟 StreamNode 的区别是,它是 Operator Chain 之后的顶点,会包含多个 StreamNode。主要成员:

  • List<OperatorIDPair> operatorIDs:该 job 节点包含的所有 operator ids,以深度优先方式存储 ids
  • ArrayList<JobEdge> inputs:带输入数据的边列表
  • ArrayList<IntermediateDataSet> results:job 节点计算出的中间结果

1.2、IntermediateDataSet

它是由一个 Operator(可能是 source,也可能是某个中间算子)产生的一个中间数据集。中间数据集可能会被其他 operators 读取,物化或丢弃。主要成员:

  • JobVertex producer:该中间结果的生产者
  • List<JobEdge> consumers:该中间结果消费边,通过消费边指向消费的节点
  • ResultPartitionType resultType:中间结果的分区类型
    • 流水线的(有界的或无界的):一旦产生数据就向下游发送,可能是逐个发送的,有界或无界的记录流。
    • 阻塞:仅在生成完整结果时向下游发送数据。

1.3、JobEdge

它相当于是 JobGraph 中的边(连接通道),这个边连接的是一个 IntermediateDataSet 跟一个要消费的 JobVertex。主要成员:

  • IntermediateDataSet sourc:边的源
  • JobVertex target:边的目标
  • DistributionPattern distributionPattern:决定了在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式
    • ALL_TO_ALL:每个生产子任务都连接到消费任务的每个子任务
    • POINTWISE:每个生产子任务都连接到使用任务的一个或多个子任务

二、Create Job Graph 主要流程

2.1、核心步骤

2.2、setChaining

从 Source StreamNode 实例开始设置 task chain,它将会递归地创建所有的 JobVertex 实例

这个方法首先从会遍历这个 StreamGraph 的所有 source 节点,然后选择从 source 节点开始执行 createChain() 方法,在具体的实现里,主要逻辑如下

总结下这个流程:

  1. 从输入节点开始,判断边的输出节点能否加入到该 chain
    • 如果可以,则继续从输出节点执行扩展该 chain
    • 否则,当前 chain 结束,以输出节点为初始节点,递归创建新的 chain
  2. 如果当前节点为 chain 的首节点,那么就创建一个 JobVertex,否则创建 StreamConfig,记录到 chainedConfigs(由于调用链上后面的节点先创建,因此创建首节点的 JobVertex 时,就可以使用 chainedConfigs 记录的信息了)

其中 JobEdge 是通过下游 JobVertex 的 connectNewDataSetAsInput 方法来创建的,在创建 JobEdge 之前,会先用上游 JobVertex 创建一个 IntermediateDataSet 实例,用来作为上游 JobVertex 的结果输出,然后作为 JobEdge 的输入,构建JobEdge实例,具体实现如下:

public JobEdge connectNewDataSetAsInput(
      JobVertex input,
      DistributionPattern distPattern,
      ResultPartitionType partitionType) {
   /** 创建输入JobVertex的输出数据集合 */
   IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
   /** 构建 JobEdge 实例 */
   JobEdge edge = new JobEdge(dataSet, this, distPattern);
   /** 将 JobEdge 实例,作为当前 JobVertex 的输入 */
   this.inputs.add(edge);
   /** 设置中间结果集合 dataSet 的消费者是上面创建的 JobEdge */
   dataSet.addConsumer(edge);
   return edge;
}

通过上述的构建过程,就可以实现上下游 JobVertex 的连接,上游 JobVertex ——> 中间结果集合 IntermediateDataSet ——> JobEdge ——> 下游 JobVertex。其中:

  • IntermediateDataSet 和 JobEdge 是用来建立上下游 JobVertex 之间连接的配置
  • 一个 IntermediateDataSet 有一个 producer,可以有多个消费者 JobEdge
  • 一个 JobEdge 则有一个数据源 IntermediateDataSet,一个目标JobVertex
  • 一个 JobVertex 可以产生多个输出 IntermediateDataSet,也可以接受来自多个 JobEdge 的数据

2.3、算子 Chainable 的依据

isChainable() 的判断依据如下:

return downStreamVertex.getInEdges().size() == 1 // 
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) // 对应的 slotSharingGroup 一样
            && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS // out operator 允许 chain 操作
            && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || // head Operator 允许跟后面的 chain 在一起
                headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
            && (edge.getPartitioner() instanceof ForwardPartitioner) // partitioner 是 ForwardPartitioner 类型
            && edge.getShuffleMode() != ShuffleMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() // 并发相等
            && streamGraph.isChainingEnabled(); // StreamGraph 允许 Chain 在一起

2.3.1、slotSharingGroup

一个 StreamNode 的 SlotSharingGroup 会按照下面这个逻辑来确定:

  1. 如果用户指定了 SlotSharingGroup,直接使用这个 SlotSharingGroup name;
  2. 如果所有的 input 都是同一个 group name,使用这个即可;
  3. 否则使用 default group;

2.3.2、edge.getPartitioner()

StreamPartitioner 的实现

用户可以在自己的代码中调用 DataStream API (比如:broadcast()shuffle() 等)配置相应的 StreamPartitioner,如果这个没有指定 StreamPartitioner 的话,则会走下面的逻辑创建默认的 StreamPartitioner:

//org.apache.flink.streaming.api.graph.StreamGraph
//note: 未指定 partitioner 的话,会为其选择 forward(并发设置相同时) 或 rebalance(并发设置不同时)
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
    partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
    partitioner = new RebalancePartitioner<Object>();
}

三、参考

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容