Flink CookBook | 水位生成机制

一、背景知识


    在时间窗口里,Flink根据元素的时间属性,将元素归类到特定时间窗口,应用程序可以从一个时间戳推断出某个事件属于哪个时间窗口,但它如何能知道它已经收到了某个时间窗口内的所有事件并且可以关闭当前窗口了呢?

    为了计算基于时间窗口的聚合,我们需要能够将事件映射到窗口,并且应用程序需要知道它何时(即便只是估计)能够关闭一个给定的窗口并报告计算结果。window区间内可以认为有两个基本的概念window_start_time和window_end_time,顾名思义,window_start_time代表的是区间的起始时间,而window_end_time代表的是window时间区间的结束时间,根据元素时间戳就可以将元素放到合适的窗口。时间戳可以解决元素归类到窗口的问题,水位解决第二个问题。水位本质上是个时间戳,在flink中,水位是单调递增的,表示数据处理的进度,当watermark >=window_end_time,即水位线大于window_end_time 时才会真正触发窗口计算。实际上会有多种触发机制,窗口相关的内容会在后续文章里细聊。

二、延迟计算和数据完整性


    水印是数据延迟和结果正确性之间的一种平衡,在计算之前,需要等多长时间来确定数据完全到达。在用基于事件时间的算子里,水印决定摄取到的数据完整性和计算的进度。但现实场景里,没有完美的水印,因为那意味着始终没有数据延迟。生成不准确的水印,可能导致数据不完整或增加不必要的应用延迟。

    如果生成的水印较已处理元素的时间戳更早,窗口可以接受更多落后的元素,但可能增加计算结果的延迟,而且通常状态的大小也会更大,因为应用需要缓存更多的数据;但是如果水印生成的频率比较大(水印可能大于某些延迟消息的时间戳),则产生的结果可能不准确,但延迟会比较低。

三、水位生成器分类


     在Flink中使用事件时间时,必须对流中事件进行时间戳赋值。因为在基于事件时间上的所有操作都会使用事件时间,所以尽早为消息分配时间戳是个不错的实践,建议在SourceFunction之后就配置时间戳和水印生成策略。当然,如果操作不会引起数据流的重分配,也可以在操作之后设置水位生成器,比如在filter算子之后配置策略。Flink会根据设置的水印生成器策略为数据流中的元素分配时间戳,并生成水印,以表示时间进度。

有多种方法配置时间戳、水位生成策略,比较常用的有两种方法配置:

    调用DataStream的assignTimestampsAndWatermarks方法,内置了水位生成机制

    调用DataStream的assignTimestamps方法,时间属性抽取、水位生成、当前水位获取都要自定义实现;新版本不的API不建议使用。

水位生成器分类如下:

3.1AssignerWithPeriodicWatermarks

    根据指定的时间间隔周期性的发射水位。Flink实现了两种周期性水位生成器:

        AscendingTimestampExtractor:

        升序模式,会将数据中的时间戳根据指定字段抽取,并用当前时间戳作为最新水印,这种比较适用于数据流里事件是按顺序生成,没有乱序情况。子类实现extractAscendingTimestamp方法,从元素里抽取时间戳(currentTimestamp),然后根据这个时间戳创建水印(new Watermark(currentTimestamp)),如果下个元素时间戳比currentTimestamp小,创建的水印的时间戳保持不变,这样可以保证水印是单调递增的,而且该元素会抛弃掉,因为时间比水印早,也即是如果一个元素的时间戳比上个元素时间戳早,就会抛弃掉。

    BoundedOutOfOrdernessTimestampExtractor:

        通过固定的时间间隔来指定水印落后于时间戳的区间长度,也就是最长容忍迟多长的时间内到达,适用于了解消息最大延迟时间的情况下。类的构造函数传入一个时间,指定最大的落后时间;子类实现extractTimestamp方法,从元素里抽取时间戳。同样也是先从元素里抽取到时间戳(currentTimestamp),但是创建水印的方式是new Watermark(currentTimestamp-maxOutOfOrderness),如果下个元素的时间戳在水印之后,即currentTimestamp-maxOutOfOrderness之后,就不会丢弃,所以允许消息最大落后maxOutOfOrderness个时间单位。

示意图见下图,红色元素时间戳在水位之前,会丢弃;绿色元素时间戳在水位范围内,是有效元素;蓝色是周期性触发生成水印时,生成的水印:

3.2 AssignerWithPunctuatedWatermarks

    算子在每次处理元素时就发射水印,即是在每次抽取元素的时间戳后,根据元素和元素的时间戳马上生成水位。可完全控制水位的生成,设置的水位比较精准,但当数据量大时,频繁的emit水位会降低降低性能。适用于基于输入元素的其他属性生成水位、元素里得属性可以明确得指定流处理进度的情况。子类实现checkAndGetNextWatermark方法,该方法接收两个参数,第一个参数是元素值,第二个参数是元素的时间戳,方法最终返回一个水位。

3.2 TimestampExtractor

    上面两种方式的混合体:算子在每次处理元素时就发射水位并同时支持周期性发射水位。可完全控制水位生成。

四、水位emit实现过程


    水印生成的实现类似于一个transformation算子:算子会作用到流里的每个元素,生成一个新的带有时间戳元素的数据流,并发射水印。水位生成器不会改变数据流的元素类型。

    首先算子每接收到一个元素时,会在processElement方法里对元素进行处理,生成一个新的数据流,为数据流里的元素增加时间戳属性:

// 1、根据设置的TimestampAssigner获取元素时间戳

final longnewTimestamp =userFunction.extractTimestamp(value,        element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

// 2、根据时间戳重新生成时间,即为事件增加时间属性

StreamRecord recordWithTT=newStreamRecord(element.getValue(), newTimestamp)

// 3、输出新的数据流

output.collect(recordWithTT);

有三种水位生成实现算子,对应三种水位生成器方式,下面分别介绍。

4.1 TimestampsAndPeriodicWatermarksOperator

    为数据流的元素分配时间戳,并定期生成水位。水位生成器时间间隔可以ExecutionConfig.setAutoWatermarkInterval方法设置,默认200毫秒。程序里根据ScheduledThreadPoolExecutor延迟某个时间后,调度TriggerTask,TriggerTask的run方法里回调onProcessingTime方法,该方法里实现水印发送:

    水印的定时生成,不是通过调度任务周期性调用的,而是每次注册一个新的线程(scheduledExecutor.schedule),然后延迟执行,线程执行时会回调onProcessingTime方法,实现水印循环生成。这这个地方如果该用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法,周期性的执行线程会更好,不用每次都重新创建线程、将线程放到线程调度池里。

这种水印生成方法是周期性调用的,所以即使数据流里没有元素,也会定时调用,获取当前水印,只是不会最终emit水印(没有元素的情况下,水印不会向前移动)。

4.2TimestampsAndPunctuatedWatermarksOperator

    水印的生成和元素的处理是绑定在一起,元素处理好就理解emit水位,即直接在算子的processElement方法里调用checkAndGetNextWatermark获取该元素的水印,这个函数是要用户自定义实现的,可以完全控制水印生成。如果返回的水印大于最近emit的水印,则发射新的水印。

// 1、调用水印生成器(TimestampAssigner)方法,抽取数据流中元素的时间戳

final longnewTimestamp = userFunction.extractTimestamp(element.value,        element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

// 2、调用水印生成器(TimestampAssigner)方法,获取该元素的水印

final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(element.value, newTimestamp);

// 3、将返回的水印和最近发射的水印进行比较

if(nextWatermark !=null&& nextWatermark.getTimestamp() > currentWatermark) {

    // 如果水印是递增的,就发射新的水印          currentWatermark = nextWatermark.getTimestamp();          output.emitWatermark(nextWatermark);

}

因为是对每个元素都会生成水印,所以发射水印可能会很频繁,适用于元素里可以明确标识水印的场景。

4.3 ExtractTimestampsOperator

    上面两种发射水位的结合形成的一种方案,即支持周期性水位发射,又支持在每次处理元素时就发射水位。

    怎么给元素分配到合适的窗口?窗口怎么一个触发机制?这些问题留待后续探讨。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,009评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,808评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,891评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,283评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,285评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,409评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,809评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,487评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,680评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,499评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,548评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,268评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,815评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,872评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,102评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,683评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,253评论 2 341