零、序言
本篇文章探究Flink Window窗口机制,首先介绍窗口机制使用的总纲,涉及的所有组件进行介绍,心中有一个大体的蓝图和认识。之后基于keyBy方法返回的Keyed Window入手,分析window方法,并依次进行WindowAssigner、Trigger类介绍。篇幅所限,计划在其他文章中继续介绍evictor、reduce/aggregate等聚合方法,以及allowedLateness方法等使用。
一、背景&目标
为了实现项目场景中自定义窗口功能,还是要先把目前Flink提供的窗口机制剖析一下,先从简单好理解的入手,以tumbling windows为例,sliding windows思路也相似。
二、窗口机制
考虑keyed Windows ,从官网介绍可以通过窗口机制整个使用方法,提纲挈领了解所涉及的组件,虽然有些组件使用的是时候是可以使用默认而不用指定。
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
组件 | 作用 |
---|---|
keyBy | 数据流按照key分流 |
window | 需要传入WindowAssigner类,用来进行Event元素时间窗口分配。滚动窗口和session窗口一个Event对应一个时间窗口,滑动窗口一个Eevent可以对应多个时间窗口。 |
trigger | 用来决定触发针对特定时间窗口进行运算的window function执行。 |
evictor | 用来在trigger触发后、window function执行之前进行event过滤。 |
allowedLatteness | 允许event延迟时间。 |
sideOutputLateData | 设置迟到的event 标签 |
getSideOutput | 获取迟到event |
reduce/aggregate/fold/apply | window function窗口计算函数,对时间窗口中的event 元素进行计算。 |
先从最基础的WindowAssigner类开始,本篇重点以tumbling windows为例。
三、探究剖析--溯源
Flink1.7.2版本Java代码。
一切都是始于stream.keyBy().window()
stream.keyBy()返回的是一个DataStream类子类:KeyedStream类对象
DataStream类的相关知识已经在通过Flink 程序模板来学习StreamExecutionEnvironment 、DataStream 、StreamTransformation类文章中探究过了。
来看看KeyedStream类对象中:window()
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
返回的是一个WindowedStream类:
public class WindowedStream<T, K, W extends Window> {
/** The keyed data stream that is windowed by this stream. */
private final KeyedStream<T, K> input;
/** The window assigner. */
private final WindowAssigner<? super T, W> windowAssigner;
/** The trigger that is used for window evaluation/emission. */
private Trigger<? super T, ? super W> trigger;
/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;
/** The user-specified allowed lateness. */
private long allowedLateness = 0L;
// 其他省略
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
function = input.getExecutionEnvironment().clean(function);
return apply(new InternalIterableWindowFunction<>(function), resultType, function);
}
第一反应,WindowedStream虽然名字叫WindowedStream但是他不是DataStream类(虽然源代码中也在datastream包中)!但是呢,他提供了一系列计算操作function,返回的可都是DataStream类的子类:SingleOutputStreamOperator类。
之后呢,看到了WindowedStream的成员和方法,可以看到窗口机制的组件 windowAssigner、trigger、evicto和窗口计算函数都在,开心啊,按图索骥即可!
根据实际运行时的dataflow来看,最终Flink拓扑会被转换为一个有一个包含算子的处理结构。Flink怎么把窗口机制所有的组件都调动起来呢?通过观察窗口计算函数返回值都是DataStream类,整个拓扑就串起来了,对应有就有相应的Transformation,也就有相应的operator(StreamOperator真正在底层处理一个一个元素的操作类)。WindowStream 的apply方法对应调用一个private apply方法:
private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
KeySelector<T, K> keySel = input.getKeySelector();
WindowOperator<K, T, Iterable<T>, R, W> operator;
if (evictor != null) {
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
trigger,
evictor,
allowedLateness,
lateDataOutputTag);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
trigger,
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
}
全程都在构建operator啊! 最后通过 return input.transform(opName, resultType, operator); 也就是说还是串在 KeyedStream上的哦!所以说WindowStream看似是一个 Stream其实只是为了构建Window机制而提供的API,到真正Flink 运行的时候,所有在KeyedStream定义的时间窗口,最终都会因为window function的调用返回一个DataStream,一个新的 Transformation被创建,窗口中的各种组件 windowAssigner 、trigger、evictor都会被打包在EvictingWindowOperator或者WindowOperator传给这个Transformation,Transformation 为王啊!
我们来看看Transformation的operator对象,以WindowOperator类(AbstractUdfStreamOperator的子类)为例,看看他的processElement方法,代码很长100多行,先看骨架:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
// 代码块1,windowAssigner可以merge
} else {
// 代码块2,windowAssigner不可以merge
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
第一步,调用windowAssigner.assignWindows给当前event element分配Window。
第二步,核心,会针对第一步window集合,一个window一个window一次处理,调用trigger.onElement方法,如果fire的话就会调用window function进行计算。
第三部,处理side ouput,迟到的元素。
四、探究剖析--WindowAssigner类
Flink源码中,WindowAssigner类对应滚动窗口的类有TumblingEventTimeWindows和TumblingProcessingTimeWindows,我们以TumblingEventTimeWindows为例,二者区别主要是窗口时间使用Event Time还是Process Time。
先看一下WindowAssigner类源码,可以看出主要包含四个抽象方法。
/**
* WindowAssigner可以分配 0个或者多个 Windows 给 Event 元素.
* @param <T> Event 元素类别.
* @param <W> Window类别.
*/
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 返回Event element 应该被分配的 window的集合
* @param timestamp :event 的时间戳.
*/
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
/**
* 返回WindowAssigner默认的trigger
*/
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
/**
* 返回Window 的 TypeSerializer
*/
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
/**
* 是否是 event time
*/
public abstract boolean isEventTime();
/**
*其他省略
*/
}
方法名称 | 方法用途 |
---|---|
assignWindows | 返回Event element 应该被分配的 window的集合 |
getDefaultTrigger | 返回WindowAssigner默认的trigger |
getWindowSerializer | 返回Window 的 TypeSerializer |
isEventTime | 是否是 event time |
接下来 我们来看 继承WindowAssigner类 的 TumblingEventTimeWindows类的具体实现。
/**
* 示例:keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));
*/
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
// 窗口大小
private final long size;
// 偏移量
private final long offset;
protected TumblingEventTimeWindows(long size, long offset) {
if (offset < 0 || offset >= size) {
throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
}
this.size = size;
this.offset = offset;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// 根据滚动窗口机制,按照当前timestamp,计算对应窗口的start时间,并返回对应窗口
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
// 其他省略
@Override
public boolean isEventTime() {
return true;
}
}
我们可以看到,主要区别就是包含了size 和 offset两个变量,使得 assignWindows 实现的时候可以返回想要的翻滚窗口。另外上面代码可以看出getDefaultTrigger返回的是 EventTimeTrigger类。
接下来我们看一下Trigger类。
Trigger方法名称 | 方法用途 |
---|---|
onElement | 每当有event element 被加到window中,会触发。结果返回事件元素对应的window是否可以进行window function计算。 |
onProcessingTime | timer 计时器触发调用,使用的是process time 。 |
onEventTime | 同上,使用的是 event time。 |
canMerge | 是否支持 窗口合并,如果返回true,必须实现onMerge方法 |
onMerge | 当多个window被WindowAssigner合并的时候的调用。 |
clear | 清理相关window 的state |
TriggerContext | TriggerContext接口,给Trigger提供state 处理和注册Timer callback |
OnMergeContext | TriggerContext子接口,onMerge方法使用,增加了mergePartitionedState方法。 |
单独整理TriggerContext 接口方法
TriggerContext方法名称 | 方法用途 |
---|---|
getCurrentProcessingTime | 返回当前processing time |
getMetricGroup | 返回MetricGroup类对象 |
getCurrentWatermark | 返回当前Watermark time |
registerProcessingTimeTimer | 注册time callback ,一旦到达time,Trigger的onProcessingTime会被调用 |
registerEventTimeTimer | 同上,当watermark 达到time,会触发Trigger的onEventTime方法。 |
deleteProcessingTimeTimer | 删除指定时间的processing time trigger |
deleteEventTimeTimer | 删除指定时间的event time trigger |
getPartitionedState | 返回 State对象 |
getKeyValueState | 返回ValueState对象 |
/**
* @param <T> Event 元素类别.
* @param <W> Window类别.
*/
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
public boolean canMerge() {
return false;
}
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
public abstract void clear(W window, TriggerContext ctx) throws Exception;
// ------------------------------------------------------------------------
/**
* A context object that is given to {@link Trigger} methods to allow them to register timer
* callbacks and deal with state.
*/
public interface TriggerContext {
long getCurrentProcessingTime();
MetricGroup getMetricGroup();
long getCurrentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
<S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
<S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
}
public interface OnMergeContext extends TriggerContext {
<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
}
接下来我们看继承Trigger的 EventTimeTrigger类的实现:
/**
* EventTime使用watermark,一旦 watermark 超过 the end of the window,EventTimeTrigger触发
*/
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {}
/**
* 判断window的最大时间戳是否小于目前的watermark,小于的话返回TriggerResult.FIRE,否则的话为这个window注册一个trimer,返回TriggerResult.CONTINUE。TriggerResult是个枚举类型
*/
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}
顺便看一下枚举类TriggerResult:
TriggerResult决定window将会发生什么,如window function是否会调用,或者window是否被丢弃。当然如果window里面没有任何数据,什么都不会发生。
TriggerResult 值 | 解释 |
---|---|
CONTINUE | 对于window来说什么都不会发生 |
FIRE_AND_PURGE | 触发window function ,而且 purge |
FIRE | 触发window function, window不会被purged |
PURGE | window会被丢弃,里面所有的元素都被清理 |
public enum TriggerResult {
CONTINUE(false, false),
FIRE_AND_PURGE(true, true),
FIRE(true, false),
PURGE(false, true);
private final boolean fire;
private final boolean purge;
TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}
public boolean isFire() {
return fire;
}
public boolean isPurge() {
return purge;
}
}