Flink window窗口机制探究--以tumbling windows为例(一)

零、序言

本篇文章探究Flink Window窗口机制,首先介绍窗口机制使用的总纲,涉及的所有组件进行介绍,心中有一个大体的蓝图和认识。之后基于keyBy方法返回的Keyed Window入手,分析window方法,并依次进行WindowAssigner、Trigger类介绍。篇幅所限,计划在其他文章中继续介绍evictor、reduce/aggregate等聚合方法,以及allowedLateness方法等使用。

一、背景&目标

为了实现项目场景中自定义窗口功能,还是要先把目前Flink提供的窗口机制剖析一下,先从简单好理解的入手,以tumbling windows为例,sliding windows思路也相似。

二、窗口机制

考虑keyed Windows ,从官网介绍可以通过窗口机制整个使用方法,提纲挈领了解所涉及的组件,虽然有些组件使用的是时候是可以使用默认而不用指定。

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
组件 作用
keyBy 数据流按照key分流
window 需要传入WindowAssigner类,用来进行Event元素时间窗口分配。滚动窗口和session窗口一个Event对应一个时间窗口,滑动窗口一个Eevent可以对应多个时间窗口。
trigger 用来决定触发针对特定时间窗口进行运算的window function执行。
evictor 用来在trigger触发后、window function执行之前进行event过滤。
allowedLatteness 允许event延迟时间。
sideOutputLateData 设置迟到的event 标签
getSideOutput 获取迟到event
reduce/aggregate/fold/apply window function窗口计算函数,对时间窗口中的event 元素进行计算。

先从最基础的WindowAssigner类开始,本篇重点以tumbling windows为例。

三、探究剖析--溯源

Flink1.7.2版本Java代码。

一切都是始于stream.keyBy().window()
stream.keyBy()返回的是一个DataStream类子类:KeyedStream类对象

DataStream类的相关知识已经在通过Flink 程序模板来学习StreamExecutionEnvironment 、DataStream 、StreamTransformation类文章中探究过了。

来看看KeyedStream类对象中:window()

public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
        return new WindowedStream<>(this, assigner);
    }

返回的是一个WindowedStream类:

public class WindowedStream<T, K, W extends Window> {

    /** The keyed data stream that is windowed by this stream. */
    private final KeyedStream<T, K> input;

    /** The window assigner. */
    private final WindowAssigner<? super T, W> windowAssigner;

    /** The trigger that is used for window evaluation/emission. */
    private Trigger<? super T, ? super W> trigger;

    /** The evictor that is used for evicting elements before window evaluation. */
    private Evictor<? super T, ? super W> evictor;

    /** The user-specified allowed lateness. */
    private long allowedLateness = 0L;
    // 其他省略
    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        function = input.getExecutionEnvironment().clean(function);
        return apply(new InternalIterableWindowFunction<>(function), resultType, function);
    }
        

第一反应,WindowedStream虽然名字叫WindowedStream但是他不是DataStream类(虽然源代码中也在datastream包中)!但是呢,他提供了一系列计算操作function,返回的可都是DataStream类的子类:SingleOutputStreamOperator类。

之后呢,看到了WindowedStream的成员和方法,可以看到窗口机制的组件 windowAssigner、trigger、evicto和窗口计算函数都在,开心啊,按图索骥即可!

根据实际运行时的dataflow来看,最终Flink拓扑会被转换为一个有一个包含算子的处理结构。Flink怎么把窗口机制所有的组件都调动起来呢?通过观察窗口计算函数返回值都是DataStream类,整个拓扑就串起来了,对应有就有相应的Transformation,也就有相应的operator(StreamOperator真正在底层处理一个一个元素的操作类)。WindowStream 的apply方法对应调用一个private apply方法:

    private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {

        final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
        KeySelector<T, K> keySel = input.getKeySelector();

        WindowOperator<K, T, Iterable<T>, R, W> operator;

        if (evictor != null) {
            @SuppressWarnings({"unchecked", "rawtypes"})
            TypeSerializer<StreamRecord<T>> streamRecordSerializer =
                    (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            ListStateDescriptor<StreamRecord<T>> stateDesc =
                    new ListStateDescriptor<>("window-contents", streamRecordSerializer);

            operator =
                new EvictingWindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    function,
                    trigger,
                    evictor,
                    allowedLateness,
                    lateDataOutputTag);

        } else {
            ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            operator =
                new WindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    function,
                    trigger,
                    allowedLateness,
                    lateDataOutputTag);
        }

        return input.transform(opName, resultType, operator);
    }

全程都在构建operator啊! 最后通过 return input.transform(opName, resultType, operator); 也就是说还是串在 KeyedStream上的哦!所以说WindowStream看似是一个 Stream其实只是为了构建Window机制而提供的API,到真正Flink 运行的时候,所有在KeyedStream定义的时间窗口,最终都会因为window function的调用返回一个DataStream,一个新的 Transformation被创建,窗口中的各种组件 windowAssigner 、trigger、evictor都会被打包在EvictingWindowOperator或者WindowOperator传给这个Transformation,Transformation 为王啊!

我们来看看Transformation的operator对象,以WindowOperator类(AbstractUdfStreamOperator的子类)为例,看看他的processElement方法,代码很长100多行,先看骨架:

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {
        // 代码块1,windowAssigner可以merge
        } else {
        // 代码块2,windowAssigner不可以merge
        }
        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

第一步,调用windowAssigner.assignWindows给当前event element分配Window。
第二步,核心,会针对第一步window集合,一个window一个window一次处理,调用trigger.onElement方法,如果fire的话就会调用window function进行计算。
第三部,处理side ouput,迟到的元素。

四、探究剖析--WindowAssigner类

Flink源码中,WindowAssigner类对应滚动窗口的类有TumblingEventTimeWindows和TumblingProcessingTimeWindows,我们以TumblingEventTimeWindows为例,二者区别主要是窗口时间使用Event Time还是Process Time。

先看一下WindowAssigner类源码,可以看出主要包含四个抽象方法。

/**
 *  WindowAssigner可以分配 0个或者多个 Windows 给 Event 元素.
 * @param <T> Event 元素类别.
 * @param <W> Window类别.
 */
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
     * 返回Event  element 应该被分配的 window的集合
     * @param timestamp :event 的时间戳.
     */
    public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
    /**
        * 返回WindowAssigner默认的trigger
     */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
    /**
        * 返回Window 的 TypeSerializer
     */
    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
       /**
        * 是否是 event time
     */
    public abstract boolean isEventTime();
      /**
        *其他省略
     */
}

方法名称 方法用途
assignWindows 返回Event element 应该被分配的 window的集合
getDefaultTrigger 返回WindowAssigner默认的trigger
getWindowSerializer 返回Window 的 TypeSerializer
isEventTime 是否是 event time

接下来 我们来看 继承WindowAssigner类 的 TumblingEventTimeWindows类的具体实现。

/**
 *   示例:keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));
 */
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
  // 窗口大小
    private final long size;
  // 偏移量
    private final long offset;

    protected TumblingEventTimeWindows(long size, long offset) {
        if (offset < 0 || offset >= size) {
            throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
        }
        this.size = size;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
                        // 根据滚动窗口机制,按照当前timestamp,计算对应窗口的start时间,并返回对应窗口                
            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
            return Collections.singletonList(new TimeWindow(start, start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
// 其他省略
    @Override
    public boolean isEventTime() {
        return true;
    }
}

我们可以看到,主要区别就是包含了size 和 offset两个变量,使得 assignWindows 实现的时候可以返回想要的翻滚窗口。另外上面代码可以看出getDefaultTrigger返回的是 EventTimeTrigger类。

接下来我们看一下Trigger类。

Trigger方法名称 方法用途
onElement 每当有event element 被加到window中,会触发。结果返回事件元素对应的window是否可以进行window function计算。
onProcessingTime timer 计时器触发调用,使用的是process time 。
onEventTime 同上,使用的是 event time。
canMerge 是否支持 窗口合并,如果返回true,必须实现onMerge方法
onMerge 当多个window被WindowAssigner合并的时候的调用。
clear 清理相关window 的state
TriggerContext TriggerContext接口,给Trigger提供state 处理和注册Timer callback
OnMergeContext TriggerContext子接口,onMerge方法使用,增加了mergePartitionedState方法。

单独整理TriggerContext 接口方法

TriggerContext方法名称 方法用途
getCurrentProcessingTime 返回当前processing time
getMetricGroup 返回MetricGroup类对象
getCurrentWatermark 返回当前Watermark time
registerProcessingTimeTimer 注册time callback ,一旦到达time,Trigger的onProcessingTime会被调用
registerEventTimeTimer 同上,当watermark 达到time,会触发Trigger的onEventTime方法。
deleteProcessingTimeTimer 删除指定时间的processing time trigger
deleteEventTimeTimer 删除指定时间的event time trigger
getPartitionedState 返回 State对象
getKeyValueState 返回ValueState对象
/**
 * @param <T> Event 元素类别.
 * @param <W> Window类别.
 */
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

    private static final long serialVersionUID = -4104633972991191369L;

    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

    public boolean canMerge() {
        return false;
    }

    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }

    public abstract void clear(W window, TriggerContext ctx) throws Exception;

    // ------------------------------------------------------------------------

    /**
     * A context object that is given to {@link Trigger} methods to allow them to register timer
     * callbacks and deal with state.
     */
    public interface TriggerContext {

        long getCurrentProcessingTime();

        MetricGroup getMetricGroup();

        long getCurrentWatermark();

        void registerProcessingTimeTimer(long time);

        void registerEventTimeTimer(long time);

        void deleteProcessingTimeTimer(long time);

        void deleteEventTimeTimer(long time);

        <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

        <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);

        <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
    }

    public interface OnMergeContext extends TriggerContext {
        <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
    }
}

接下来我们看继承Trigger的 EventTimeTrigger类的实现:

/**
 * EventTime使用watermark,一旦 watermark 超过 the end of the window,EventTimeTrigger触发
 */
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}
    /**
     * 判断window的最大时间戳是否小于目前的watermark,小于的话返回TriggerResult.FIRE,否则的话为这个window注册一个trimer,返回TriggerResult.CONTINUE。TriggerResult是个枚举类型
     */    
    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
            OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

顺便看一下枚举类TriggerResult:
TriggerResult决定window将会发生什么,如window function是否会调用,或者window是否被丢弃。当然如果window里面没有任何数据,什么都不会发生。

TriggerResult 值 解释
CONTINUE 对于window来说什么都不会发生
FIRE_AND_PURGE 触发window function ,而且 purge
FIRE 触发window function, window不会被purged
PURGE window会被丢弃,里面所有的元素都被清理
public enum TriggerResult {

    CONTINUE(false, false),

    FIRE_AND_PURGE(true, true),

    FIRE(true, false),

    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return fire;
    }

    public boolean isPurge() {
        return purge;
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容