一、背景知识
在时间窗口里,Flink根据元素的时间属性,将元素归类到特定时间窗口,应用程序可以从一个时间戳推断出某个事件属于哪个时间窗口,但它如何能知道它已经收到了某个时间窗口内的所有事件并且可以关闭当前窗口了呢?
为了计算基于时间窗口的聚合,我们需要能够将事件映射到窗口,并且应用程序需要知道它何时(即便只是估计)能够关闭一个给定的窗口并报告计算结果。window区间内可以认为有两个基本的概念window_start_time和window_end_time,顾名思义,window_start_time代表的是区间的起始时间,而window_end_time代表的是window时间区间的结束时间,根据元素时间戳就可以将元素放到合适的窗口。时间戳可以解决元素归类到窗口的问题,水位解决第二个问题。水位本质上是个时间戳,在flink中,水位是单调递增的,表示数据处理的进度,当watermark >=window_end_time,即水位线大于window_end_time 时才会真正触发窗口计算。实际上会有多种触发机制,窗口相关的内容会在后续文章里细聊。
二、延迟计算和数据完整性
水印是数据延迟和结果正确性之间的一种平衡,在计算之前,需要等多长时间来确定数据完全到达。在用基于事件时间的算子里,水印决定摄取到的数据完整性和计算的进度。但现实场景里,没有完美的水印,因为那意味着始终没有数据延迟。生成不准确的水印,可能导致数据不完整或增加不必要的应用延迟。
如果生成的水印较已处理元素的时间戳更早,窗口可以接受更多落后的元素,但可能增加计算结果的延迟,而且通常状态的大小也会更大,因为应用需要缓存更多的数据;但是如果水印生成的频率比较大(水印可能大于某些延迟消息的时间戳),则产生的结果可能不准确,但延迟会比较低。
三、水位生成器分类
在Flink中使用事件时间时,必须对流中事件进行时间戳赋值。因为在基于事件时间上的所有操作都会使用事件时间,所以尽早为消息分配时间戳是个不错的实践,建议在SourceFunction之后就配置时间戳和水印生成策略。当然,如果操作不会引起数据流的重分配,也可以在操作之后设置水位生成器,比如在filter算子之后配置策略。Flink会根据设置的水印生成器策略为数据流中的元素分配时间戳,并生成水印,以表示时间进度。
有多种方法配置时间戳、水位生成策略,比较常用的有两种方法配置:
调用DataStream的assignTimestampsAndWatermarks方法,内置了水位生成机制
调用DataStream的assignTimestamps方法,时间属性抽取、水位生成、当前水位获取都要自定义实现;新版本不的API不建议使用。
水位生成器分类如下:
3.1AssignerWithPeriodicWatermarks
根据指定的时间间隔周期性的发射水位。Flink实现了两种周期性水位生成器:
AscendingTimestampExtractor:
升序模式,会将数据中的时间戳根据指定字段抽取,并用当前时间戳作为最新水印,这种比较适用于数据流里事件是按顺序生成,没有乱序情况。子类实现extractAscendingTimestamp方法,从元素里抽取时间戳(currentTimestamp),然后根据这个时间戳创建水印(new Watermark(currentTimestamp)),如果下个元素时间戳比currentTimestamp小,创建的水印的时间戳保持不变,这样可以保证水印是单调递增的,而且该元素会抛弃掉,因为时间比水印早,也即是如果一个元素的时间戳比上个元素时间戳早,就会抛弃掉。
BoundedOutOfOrdernessTimestampExtractor:
通过固定的时间间隔来指定水印落后于时间戳的区间长度,也就是最长容忍迟多长的时间内到达,适用于了解消息最大延迟时间的情况下。类的构造函数传入一个时间,指定最大的落后时间;子类实现extractTimestamp方法,从元素里抽取时间戳。同样也是先从元素里抽取到时间戳(currentTimestamp),但是创建水印的方式是new Watermark(currentTimestamp-maxOutOfOrderness),如果下个元素的时间戳在水印之后,即currentTimestamp-maxOutOfOrderness之后,就不会丢弃,所以允许消息最大落后maxOutOfOrderness个时间单位。
示意图见下图,红色元素时间戳在水位之前,会丢弃;绿色元素时间戳在水位范围内,是有效元素;蓝色是周期性触发生成水印时,生成的水印:
3.2 AssignerWithPunctuatedWatermarks
算子在每次处理元素时就发射水印,即是在每次抽取元素的时间戳后,根据元素和元素的时间戳马上生成水位。可完全控制水位的生成,设置的水位比较精准,但当数据量大时,频繁的emit水位会降低降低性能。适用于基于输入元素的其他属性生成水位、元素里得属性可以明确得指定流处理进度的情况。子类实现checkAndGetNextWatermark方法,该方法接收两个参数,第一个参数是元素值,第二个参数是元素的时间戳,方法最终返回一个水位。
3.2 TimestampExtractor
上面两种方式的混合体:算子在每次处理元素时就发射水位并同时支持周期性发射水位。可完全控制水位生成。
四、水位emit实现过程
水印生成的实现类似于一个transformation算子:算子会作用到流里的每个元素,生成一个新的带有时间戳元素的数据流,并发射水印。水位生成器不会改变数据流的元素类型。
首先算子每接收到一个元素时,会在processElement方法里对元素进行处理,生成一个新的数据流,为数据流里的元素增加时间戳属性:
// 1、根据设置的TimestampAssigner获取元素时间戳
final longnewTimestamp =userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
// 2、根据时间戳重新生成时间,即为事件增加时间属性
StreamRecord recordWithTT=newStreamRecord(element.getValue(), newTimestamp)
// 3、输出新的数据流
output.collect(recordWithTT);
有三种水位生成实现算子,对应三种水位生成器方式,下面分别介绍。
4.1 TimestampsAndPeriodicWatermarksOperator
为数据流的元素分配时间戳,并定期生成水位。水位生成器时间间隔可以ExecutionConfig.setAutoWatermarkInterval方法设置,默认200毫秒。程序里根据ScheduledThreadPoolExecutor延迟某个时间后,调度TriggerTask,TriggerTask的run方法里回调onProcessingTime方法,该方法里实现水印发送:
水印的定时生成,不是通过调度任务周期性调用的,而是每次注册一个新的线程(scheduledExecutor.schedule),然后延迟执行,线程执行时会回调onProcessingTime方法,实现水印循环生成。这这个地方如果该用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法,周期性的执行线程会更好,不用每次都重新创建线程、将线程放到线程调度池里。
这种水印生成方法是周期性调用的,所以即使数据流里没有元素,也会定时调用,获取当前水印,只是不会最终emit水印(没有元素的情况下,水印不会向前移动)。
4.2TimestampsAndPunctuatedWatermarksOperator
水印的生成和元素的处理是绑定在一起,元素处理好就理解emit水位,即直接在算子的processElement方法里调用checkAndGetNextWatermark获取该元素的水印,这个函数是要用户自定义实现的,可以完全控制水印生成。如果返回的水印大于最近emit的水印,则发射新的水印。
// 1、调用水印生成器(TimestampAssigner)方法,抽取数据流中元素的时间戳
final longnewTimestamp = userFunction.extractTimestamp(element.value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
// 2、调用水印生成器(TimestampAssigner)方法,获取该元素的水印
final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(element.value, newTimestamp);
// 3、将返回的水印和最近发射的水印进行比较
if(nextWatermark !=null&& nextWatermark.getTimestamp() > currentWatermark) {
// 如果水印是递增的,就发射新的水印 currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark);
}
因为是对每个元素都会生成水印,所以发射水印可能会很频繁,适用于元素里可以明确标识水印的场景。
4.3 ExtractTimestampsOperator
上面两种发射水位的结合形成的一种方案,即支持周期性水位发射,又支持在每次处理元素时就发射水位。
怎么给元素分配到合适的窗口?窗口怎么一个触发机制?这些问题留待后续探讨。