flink 学习笔记 — 时间定义及窗口机制

flink 回顾

    通过之前的了解,我们知道,flink是一个高吞吐、低延时的流式处理框架。flink 中具有严格的时间定义,采用不同的时间机制,对于其处理延时及处理结果的准确性都有重要的影响。而在不同的时间机制下,我们想要得到准确一致的结果,在 flink 中我们又能怎么处理呢?

flink 时间定义

    事实上,flink 提供了三种不同的时间标的,分别是 Event Time(事件时间)、Ingestion Time(注入时间)、Processing Time(处理时间),下图给出了三种时间对应的发生时机:

flink各时间触发时机图.jpg

  • Event Time:由上图可知,eventTime 是事件真正发生的时间,在事件发生时触发,此后事件事件会跟随数据流进入 flink 系统中,供处理使用。在 flink 系统中,我们可以使用该时间,当然,也可以使用其他的时间作为标准,这是由业务需求所决定。事实上,在整个数据流中,eventTime 并不是严格有序的,尤其是在我们处理分布式消息中间件(如:kafka)中的数据时,eventTime 一定会出现乱序的情况。因此在使用 eventTime 且想要得到有序的结果时,需要做好乱序数据的处理。在 flink 中,可有使用 watermark 机制进行解决。采用 eventTime 我们在进行多次重复计算时,可以得到完全一致的结果。
  • Ingestion Time:我们可以理解为注入时间,它是时间流进入 flink 处理框架中的时机。通常情况下,我们以数据源 source 为数据进入系统中的起点,因此注入时间就是 source 算子发生时所在的机器的当前时间戳。这里我们并不需要考虑事件数据流是否有序的问题。
  • Processing Time:事件处理事件,它是事件处理时,执行 task 所在的机器的当前时间戳。当我们使用 processing time 作为处理时间流的时间机制时,不需要考虑事件的乱序情况,只要考虑系统本身的事件就ok了。事实上,在使用 processing time 时,对与同一批数据进行多次计算时,得到的结果很可能不会一致。当然,如果我们的业务需求并不需要我们多次得到一致的结果时,采用 processing time 是最有效、快捷的方式。flink 本身默认的事件机制就是 processing time。

    在使用 flink 进行编码时,我们可以这样在代码中指定我们需要采用的时间方式,env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);:

public static void main(String []args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 指定时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> dataSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = dataSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] values = value.split(" ");
                return new Tuple2<String, Integer>(values[0], 1);
            }
        }).keyBy(0)
          .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
              @Override
              public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                  return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
              }
          });

        reduce.print();
        env.execute("test");
    }

    在常用的业务处理时,对于 Ingestion Time 和 Processing Time 的使用相对较少,因此接下来将对 Event Time 在个业务中的使用做详细的介绍。

flink 窗口机制

    Flink 中窗口包括 count window 和 time window,而 time window 又分为 Tumbling Time Window、Sliding Time Window、Session Window 以及 Global Window

各类型窗口图.png

  • Count Window:计数窗口,根据窗口内数据的数量进行滚动进入下一个窗口。如上图中,我们以 2 为窗口大小,即每当窗口中的数据为 2 时进行一次计算,然后进入下一批次。在 api 中可以这么实现:
public class TimeAndWindow {

    public static void main(String []args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 创建数据源
        DataStream<String> dataStream = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] values = value.split(" ");
                return new Tuple2<String, Integer>(values[0], 1);
            }
        }).keyBy(0)
          .countWindow(2)   // 使用count window
          .sum(1);

        sum.print();

        env.execute("test");
    }
}
  • Tumbling Time Window:译为滚动时间窗口,在使用 count window 时,跟我们所 set 的时间没有太大关系,但时当我们使用时间窗口时,根据不同的时间机制,需要对窗口有不同的操作。如下 flink 源码:
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return window(TumblingProcessingTimeWindows.of(size));
        } else {
            return window(TumblingEventTimeWindows.of(size));
        }
    }

    由下面源码我们可以看到,在使用 ProcessingTime 和 EventTime 时,flink 底层有不同的实现,结下来分别看一下两种实现的区别。可以看到使用 ProcessingTime 时,flink 内部会通过上下文获取当前时间戳进行处理,而在 EventTime 中,需要依赖事件中所携带的时间戳。因此,需要我们先 DataStream.assignTimestampsAndWatermarks(...) 进行时间分配,否则会抛出异常,而 ProcessingTime 无需调用。

//  ProcessingTime 机制 assignWindows 实现
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
    final long now = context.getCurrentProcessingTime();
    long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
    return Collections.singletonList(new TimeWindow(start, start + size));
}


//  EventTime 机制 assignWindows 实现
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
    if (timestamp > Long.MIN_VALUE) {
        // Long.MIN_VALUE is currently assigned when no timestamp is present
        long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    } else {
        throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}

    下图描述了 Flink 的窗口机制以及各组件之间是如何相互工作的。由数据源流入的每条数据,会由 WindowAssigner 分配到对应的 Window,当 Window 被触发之后,会交给 Evictor(如果没有设置 Evictor 则跳过),然后处理 UserFunction。


时间窗口底层图.png
  • Sliding Time Window:滑动窗口,此时触发的 timeWindow 需要传入两个参数: timeWindow(Time.milliseconds(2), Time.milliseconds(2)),其中第一个参数为窗口的大小,第二个参数为窗口滑动的步长。事实上,由源码可以看到,在使用滚动窗口时,flink 内部会为窗口默认一个 offset = 0,而在滑动窗口时显示指定 offset:
/**
     * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
     * elements to time windows based on the element timestamp.
     *
     * @param size The size of the generated windows.
     * @return The time policy.
     */
    public static TumblingEventTimeWindows of(Time size) {
        return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
    }

    /**
     * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
     * elements to time windows based on the element timestamp and offset.
     *
     * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
     * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
     * time windows start at 0:15:00,1:15:00,2:15:00,etc.
     *
     * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
     * such as China which is using UTC+08:00,and you want a time window with size of one day,
     * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
     * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
     *
     * @param size The size of the generated windows.
     * @param offset The offset which window start would be shifted by.
     * @return The time policy.
     */
    public static TumblingEventTimeWindows of(Time size, Time offset) {
        return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
    }
  • Session Window:这类窗口通常的使用场景是页面浏览,当两个 session 之间的 gap 大于某个阈值的时候进行计算。
  • Global Window:这类窗口通常相当于一个窗口,在实现时调用 windowAll 方法:
@PublicEvolving
    public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
        return new AllWindowedStream<>(this, assigner);
    }
    
@PublicEvolving
    public AllWindowedStream(DataStream<T> input,
            WindowAssigner<? super T, W> windowAssigner) {
        this.input = input.keyBy(new NullByteKeySelector<T>());
        this.windowAssigner = windowAssigner;
        this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
    }

WaterMark 水印机制

    WaterMark 是 flink 为应对 EventTime 的乱序所提出的一种机制,它可以延迟窗口的触发时间,在一定程度上保证延时到来的数据可以进入到相应的时间窗口进行处理,从而保证多次只从同一批数据时所得到的结果是一致的。通常情况下,WaterMark 的生成时在 source 之后,越接近数据源,得到的数据越准确。事实上,WaterMark本身也是一种认为插入的事件,具有相应的时间戳,我们认为,在 WaterMark 到达时,该时间以前的数据已经全部进入相应的窗口内。

watermark.png

    Flink 中,WaterMark 具有两种不同的实现机制,1. AssignerWithPeriodicWatermarks(周期性),2. AssignerWithPunctuatedWatermarks(伴随着每个最新的事件时间)。

// AssignerWithPeriodicWatermarks 水印生成实现
assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
            Long currTime = 0L;
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currTime);
            }

            @Override
            public long extractTimestamp(String element, long previousElementTimestamp) {
                String[] values = element.split(" ");
                return Long.parseLong(values[1]);
            }
        })
        
// AssignerWithPunctuatedWatermarks 水印生成实现
assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() {
                    @Nullable
                    @Override
                    public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
                        return null;
                    }

                    @Override
                    public long extractTimestamp(String element, long previousElementTimestamp) {
                        return 0;
                    }
                })
使用场景:
  • AssignerWithPeriodicWatermarks:周期性插入时间戳,这种 WaterMark 通常在数据量较大的数据流中使用。
  • AssignerWithPunctuatedWatermarks:这种 WaterMark 通常是伴随着每一条数据进入 flink 系统中,在抽取过程中如果携带的 waterMark 小于当前 WaterMark 则不更新,否则更新至较大的 WaterMark 。这种方式不建议在数据流较大的业务中使用,会大大增加系统本身的压力,影响数据处理的性能。
WarterMark 源码
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {

    /**
     * Returns the current watermark. This method is periodically called by the
     * system to retrieve the current watermark. The method may return {@code null} to
     * indicate that no new Watermark is available.
     *
     * <p>The returned watermark will be emitted only if it is non-null and its timestamp
     * is larger than that of the previously emitted watermark (to preserve the contract of
     * ascending watermarks). If the current watermark is still
     * identical to the previous one, no progress in event time has happened since
     * the previous call to this method. If a null value is returned, or the timestamp
     * of the returned watermark is smaller than that of the last emitted one, then no
     * new watermark will be generated.
     *
     * <p>The interval in which this method is called and Watermarks are generated
     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
     *
     * @see org.apache.flink.streaming.api.watermark.Watermark
     * @see ExecutionConfig#getAutoWatermarkInterval()
     *
     * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
     */
    @Nullable
    Watermark getCurrentWatermark();
}

public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {

    /**
     * Asks this implementation if it wants to emit a watermark. This method is called right after
     * the {@link #extractTimestamp(Object, long)} method.
     *
     * <p>The returned watermark will be emitted only if it is non-null and its timestamp
     * is larger than that of the previously emitted watermark (to preserve the contract of
     * ascending watermarks). If a null value is returned, or the timestamp of the returned
     * watermark is smaller than that of the last emitted one, then no new watermark will
     * be generated.
     *
     * <p>For an example how to use this method, see the documentation of
     * {@link AssignerWithPunctuatedWatermarks this class}.
     *
     * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
     */
    @Nullable
    Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}

public interface TimestampAssigner<T> extends Function {

    /**
     * Assigns a timestamp to an element, in milliseconds since the Epoch.
     *
     * <p>The method is passed the previously assigned timestamp of the element.
     * That previous timestamp may have been assigned from a previous assigner,
     * by ingestion time. If the element did not carry a timestamp before, this value is
     * {@code Long.MIN_VALUE}.
     *
     * @param element The element that the timestamp will be assigned to.
     * @param previousElementTimestamp The previous internal timestamp of the element,
     *                                 or a negative value, if no timestamp has been assigned yet.
     * @return The new timestamp.
     */
    long extractTimestamp(T element, long previousElementTimestamp);

    无论是 AssignerWithPeriodicWatermarks,还是 AssignerWithPunctuatedWatermarks,都继承了 TimestampAssigne 来获取需要的时间戳。

  • 事实上,对于 EventTime 延迟较小的事件流,使用 WaterMark 机制可以在一定程度上保证数据的完整性,然而当数据的延时较大时,比如,由于某些原因,我们的数据延迟了24小时,这种情况下使用 WaterMark 显然是不合适的,Flink 提供了 allowedLateness 方法进行处理,这里暂时不做介绍。

总结

    本文简要介绍了 Flink 中的时间机制以及窗口机制,事实上,这两种机制是完全结偶。而 WarterMark 的使用可以最大限度的解决数据流的乱序问题,但同时造成了一定程度的时延。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容