前言
这篇是昨晚没写完的,今晚补全发出来。
Flink算子链简介
“为什么我的Flink作业Web UI中只显示出了一个框,并且Records Sent和Records Received指标都是0?是我的程序写得有问题吗?”
笔者在Flink社区群里经常能看到类似这样的疑问。这种情况几乎都不是程序有问题,而是因为Flink的operator chain——即算子链机制导致的,即提交的作业的执行计划中,所有算子的并发实例(即sub-task)都因为满足特定条件而串成了整体来执行,自然就观察不到算子之间的数据流量了。
当然上述是一种特殊情况。我们更常见到的是只有部分算子得到了算子链机制的优化,如官方文档中出现过多次的下图所示,注意Source和map()算子。
算子链机制的好处是显而易见的:所有chain在一起的sub-task都会在同一个线程(即TaskManager的slot)中执行,能够减少不必要的数据交换、序列化和上下文切换,从而提高作业的执行效率。
铺垫了这么多,接下来就通过源码简单看看算子链产生的条件,以及它是如何在Flink Runtime中实现的。
逻辑计划中的算子链
对Flink Runtime稍有了解的看官应该知道,Flink作业的执行计划会用三层图结构来表示,即:
- StreamGraph——原始逻辑执行计划
- JobGraph——优化的逻辑执行计划(Web UI中看到的就是这个)
- ExecutionGraph——物理执行计划
算子链是在优化逻辑计划时加入的,也就是由StreamGraph生成JobGraph的过程中。那么我们来到负责生成JobGraph的o.a.f.streaming.api.graph.StreamingJobGraphGenerator类,查看其核心方法createJobGraph()的源码。
private JobGraph createJobGraph() {
// make sure that all vertices start immediately
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges();
// 略......
return jobGraph;
}
可见,该方法会先计算出StreamGraph中各个节点的哈希码作为唯一标识,并创建一个空的Map结构保存即将被链在一起的算子的哈希码,然后调用setChaining()方法,如下源码所示。
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}
可见是逐个遍历StreamGraph中的Source节点,并调用createChain()方法。createChain()是逻辑计划层创建算子链的核心方法,完整源码如下,有点长。
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) {
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
先解释一下方法开头创建的3个List结构:
- transitiveOutEdges:当前算子链在JobGraph中的出边列表,同时也是createChain()方法的最终返回值;
- chainableOutputs:当前能够链在一起的StreamGraph边列表;
- nonChainableOutputs:当前不能够链在一起的StreamGraph边列表。
接下来,从Source开始遍历StreamGraph中当前节点的所有出边,调用isChainable()方法判断是否可以被链在一起(这个判断逻辑稍后会讲到)。可以链接的出边被放入chainableOutputs列表,否则放入nonChainableOutputs列表。
对于chainableOutputs中的边,就会以这些边的直接下游为起点,继续递归调用createChain()方法延展算子链。对于nonChainableOutputs中的边,由于当前算子链的延展已经到头,就会以这些“断点”为起点,继续递归调用createChain()方法试图创建新的算子链。也就是说,逻辑计划中整个创建算子链的过程都是递归的,亦即实际返回时,是从Sink端开始返回的。
然后要判断当前节点是不是算子链的起始节点。如果是,则调用createJobVertex()方法为算子链创建一个JobVertex(即JobGraph中的节点),也就形成了我们在Web UI中看到的JobGraph效果:
最后,还需要将各个节点的算子链数据写入各自的StreamConfig中,算子链的起始节点要额外保存下transitiveOutEdges。StreamConfig在后文的物理执行阶段会再次用到。
形成算子链的条件
来看看isChainable()方法的代码。
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
由此可得,上下游算子能够chain在一起的条件还是非常苛刻的(老生常谈了),列举如下:
- 上下游算子实例处于同一个SlotSharingGroup中(之后再提);
- 下游算子的链接策略(ChainingStrategy)为ALWAYS——既可以与上游链接,也可以与下游链接。我们常见的map()、filter()等都属此类;
- 上游算子的链接策略为HEAD或ALWAYS。HEAD策略表示只能与下游链接,这在正常情况下是Source算子的专属;
- 两个算子间的物理分区逻辑是ForwardPartitioner,可参见之前写过的《聊聊Flink DataStream的八种物理分区逻辑》;
- 两个算子间的shuffle方式不是批处理模式;
- 上下游算子实例的并行度相同;
- 没有禁用算子链。
禁用算子链
用户可以在一个算子上调用startNewChain()方法强制开始一个新的算子链,或者调用disableOperatorChaining()方法指定它不参与算子链。代码位于SingleOutputStreamOperator类中,都是通过改变算子的链接策略实现的。
@PublicEvolving
public SingleOutputStreamOperator<T> disableChaining() {
return setChainingStrategy(ChainingStrategy.NEVER);
}
@PublicEvolving
public SingleOutputStreamOperator<T> startNewChain() {
return setChainingStrategy(ChainingStrategy.HEAD);
}
如果要在整个运行时环境中禁用算子链,调用StreamExecutionEnvironment.disableOperatorChaining()方法即可。
物理计划中的算子链
在JobGraph转换成ExecutionGraph并交由TaskManager执行之后,会生成调度执行的基本任务单元——StreamTask,负责执行具体的StreamOperator逻辑。在StreamTask.invoke()方法中,初始化了状态后端、checkpoint存储和定时器服务之后,可以发现:
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();
构造出了一个OperatorChain实例,这就是算子链在实际执行时的形态。解释一下OperatorChain中的几个主要属性。
private final StreamOperator<?>[] allOperators;
private final RecordWriterOutput<?>[] streamOutputs;
private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
private final OP headOperator;
- headOperator:算子链的第一个算子,对应JobGraph中的算子链起始节点;
- allOperators:算子链中的所有算子,倒序排列,即headOperator位于该数组的末尾;
- streamOutputs:算子链的输出,可以有多个;
- chainEntryPoint:算子链的“入口点”,它的含义将在后文说明。
由上可知,所有StreamTask都会创建OperatorChain。如果一个算子无法进入算子链,也会形成一个只有headOperator的单个算子的OperatorChain。
OperatorChain构造方法中的核心代码如下。
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
RecordWriterOutput<?> streamOutput = createStreamOutput(
recordWriters.get(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput);
}
// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOps);
if (operatorFactory != null) {
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();
headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
} else {
headOperator = null;
}
// add head operator to end of chain
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
首先会遍历算子链整体的所有出边,并调用createStreamOutput()方法创建对应的下游输出RecordWriterOutput。然后就会调用createOutputCollector()方法创建物理的算子链,并返回chainEntryPoint,这个方法比较重要,部分代码如下。
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators) {
List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);
// create collectors for the network outputs
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// Create collectors for the chained outputs
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
int outputId = outputEdge.getTargetId();
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators,
outputEdge.getOutputTag());
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// 以下略......
}
该方法从上一节提到的StreamConfig中分别取出出边和链接边的数据,并创建各自的Output。出边的Output就是将数据发往算子链之外下游的RecordWriterOutput,而链接边的输出要靠createChainedOperator()方法。
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators,
OutputTag<IN> outputTag) {
// create the output that the operator writes to first. this may recursively create more operators
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
containingTask,
operatorConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators);
// now create the operator and give it the output collector to write its output to
StreamOperatorFactory<OUT> chainedOperatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader);
OneInputStreamOperator<IN, OUT> chainedOperator = chainedOperatorFactory.createStreamOperator(
containingTask, operatorConfig, chainedOperatorOutput);
allOperators.add(chainedOperator);
WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
}
else {
TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
}
// wrap watermark gauges since registered metrics must be unique
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
return currentOperatorOutput;
}
我们一眼就可以看到,这个方法递归调用了上述createOutputCollector()方法,与逻辑计划阶段类似,通过不断延伸Output来产生chainedOperator(即算子链中除了headOperator之外的算子),并逆序返回,这也是allOperators数组中的算子顺序为倒序的原因。
chainedOperator产生之后,将它们通过ChainingOutput连接起来,形成如下图所示的结构。
最后来看看ChainingOutput.collect()方法是如何输出数据流的。
@Override
public void collect(StreamRecord<T> record) {
if (this.outputTag != null) {
// we are only responsible for emitting to the main input
return;
}
pushToOperator(record);
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
// we are only responsible for emitting to the side-output specified by our
// OutputTag.
return;
}
pushToOperator(record);
}
protected <X> void pushToOperator(StreamRecord<X> record) {
try {
// we know that the given outputTag matches our OutputTag so the record
// must be of the type that our operator expects.
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;
numRecordsIn.inc();
operator.setKeyContextElement1(castRecord);
operator.processElement(castRecord);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
可见是通过调用链接算子的processElement()方法,直接将数据推给下游处理了。也就是说,OperatorChain完全可以看做一个由headOperator和streamOutputs组成的单个算子,其内部的chainedOperator和ChainingOutput都像是被黑盒遮蔽,同时没有引入任何overhead。
打通了算子链在执行层的逻辑,看官应该会明白chainEntryPoint的含义了。由于它位于递归返回的终点,所以它就是流入算子链的起始Output,即上图中指向headOperator的RecordWriterOutput。
The End
上半年过去了,希望下半年能够多些好事发生。
民那晚安。