时间语义
上图是数据流式处理过程,涉及到两个重要的时间点:事件时间(Event Time)和处理时间(Processing Time)。
- 事件时间(Event Time):即数据产生的时间;
- 处理时间(Processing Time):即数据真正被处理的时刻;
我们在处理数据时,以哪种时间作为衡量标准,就是所谓的时间语义问题(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有滞后。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。
水位线(Watermark)
我们把时钟也数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接到广播下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似流水中用来当做标志的几号,在Flink中,这种用来衡量事件时间(Event Time)进展的标记,就被称作水位线(Watermark)。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而他插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
理想的水位线是有序的,但是现实中由于不可控因素常常会有少量乱序的数据。
水位线代表当前事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢失数据,这一点对于乱序流的正确处理非常重要。水位线的特性:
- 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据;
- 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展;
- 水位显示基于数据的时间戳生成的;
- 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进;
- 水位线可以通过设置延迟,以保证正确处理乱序数据;
- 一个水位线(Watermark)t表示在当前流中事件时间已经到达了时间戳t,这导表t之前的所有数据都到齐了,之后流中不会出现时间戳t'<t的数据;
水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。
如何生成水位线
在生成水位线的时候,如果希望计算结果更准确,可以将水位线延迟设置得更高一些,等待时间越长,越不容易漏掉数据,但是这样时效性降低了。而如果将等待时间设置过短则会遗漏掉部分数据,虽然Flink提供了处理迟到数据的方法,但是需要分开处理。因此如何设置延迟是一个需要根据实际情况权衡的问题。
在Flink的DataStream API中,有一个单独用于生成水位线的方法:assignTimestampAndWatermarks()
,他主要用来为流中的数据分配时间戳,并生成水位线来显示时间。该方法需要传入一个WatermarkStrategy
作为参数,WatermarkStrategy 中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator:
-
TimestampAssigner
, 主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础; -
WatermarkGenerator
, 主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator
中主要有两个方法:onEvent和onPeriodicEmit;-
onEvent
, 每个事件(数据)到来都会调用的方法,他的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作; -
onPeriodiEmit
, 周期性调用的方法,可以由WatermarkOutput
发出水位线。周期时间为处理时间,可以调用环境配置的setAutoWatermarkInterval()方法来设置,默认为200ms;
-
代码
Flink提供了内置的水位线生成器:
- 有序流,对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。
- 乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的时间延迟(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
代码
自定义水位线
在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了。不同策略的关键在于WatermarkGenerator的实现。整体来说,Flink有两种不同的生产水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated):
- 周期性水位线生成器(Periodic Generator),周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线;
- 断点式水位线生成器(Punctuated Generator),断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。
此外,也可以在自定义数据源中发送水位线,但是这样就不能使用assignTimestampsAndWatermarks 方法来生成水位线了,两者只能二选一。
在“重分区”(redistributing)的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,所以同一时刻发给下游任务的水位线可能并不相同。这说明上游各个分区处理得有快有慢,进度各不相同,这时我们应该以最慢的那个时钟,也就是最小的那个水位线为准。
窗口(Window)
Flink是一种流式计算引擎,主要是用来处理无界数据流的。想要更加方便的处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(window)。在Flink中,窗口就是用来处理无界流的核心。
由于存在迟到数据的问题,将窗口视为一个框可能并不是最合适的。我们可以把它理解成一个“桶”(bucket):每个数据都会分发到对应的桶中,当到达窗口的结束时间时,就对每个桶中收集的数据进行计算处理。
窗口的分类
- 按照驱动类型分类:
- 1)时间窗口,时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据;
- 2)计数窗口,计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。
-
按照窗口分配数据的规则分类:1)滚动窗口,2)滑动窗口,3)会话窗口,4)全局窗口;
窗口API概览
在定义窗口操作之前,需要先确定到底是基于按键分区的数据流KeyedStream还是在没有按键分区的DataStream上面开窗。也即调用窗口算子之前是否有keyBy操作。
而在API上面的区别也是非常小:
// 按键分区
stream.keyBy(...).window(...)
// 非按键分区
stream.windowAll(...)
窗口分配器(Window Assigner)
定义窗口分配器(Window Assigner)是构建窗口算子的第一步,他的作用就是定义数据应该被分配到哪个窗口。通过向上一节中的window/windowAll函数中传入WindowAssigner参数,返回WindowStream。
不同窗口类型有不同的窗口分配器。
1、时间窗口
// 滚动处理时间窗口
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))
.aggregate(...)
// 滑动处理时间窗口
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))
.aggregate(...)
// 处理时间会话窗口
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
// 滚动事件时间窗口
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)
// 滑动事件时间窗口
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
// 事件时间会话窗口
stream.keyBy(...)
.window(EventTimeSessionWindows.WithGap(Time.seconds(10)))
.aggregate(...)
// 2、计数窗口
// 滚动计数窗口, 定义一个长度为10的滚动计数窗口
stream.keyBy(...)
.countWindow(10)
// 滑动计数窗口,长度为10,步长为3
stream.keyBy(...)
.countWindow(10, 3)
// 3、全局窗口, 全局窗口必须自行定义触发器才能实现窗口计算,否则起不到任何作用
stream.keyBy(...)
.window(GlobalWindows.create())
窗口函数(Window Functions)
在上面定义了窗口分配器,我们只是知道了数据属于哪个窗口,而本节介绍的窗口函数则是如何将这些窗口中的数据收集起来,即如何处理。
窗口函数是作用在windowStream上面的,返回的是DataStream。各种stream间的转换如下:
1、增量聚合函数
为了提高实时性,我们可以像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。
典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。
ReduceFunction:
package com.whu.chapter06
import com.whu.chapter05.{ClickSource, Event}
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
object WindowFunctionDemo {
def main(args:Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(new ClickSource())
// 数据源中的时间戳是单调递增的,所以使用下面的方法,只需要抽取时间戳就好了
// 等同于最大延迟时间是0毫秒
.assignAscendingTimestamps(_.timeStamp)
.map(r => (r.user, 1L))
// 使用用户名对数据流进行分组
.keyBy(_._1)
// 设置5秒钟的滚动事件时间窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 保留第一个字段,针对第二个字段进行聚合
.reduce((r1, r2) => (r1._1, r1._2+r2._2))
.print()
env.execute()
}
}
AggregateFunction
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
AggregateFunction 在源码中的定义如下:
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
{
// 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次
ACC createAccumulator();
// 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程
ACC add(IN value, ACC accumulator);
// getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出
OUT getResult(ACC accumulator);
// 合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用
ACC merge(ACC a, ACC b);
}
AggregateFunction接受3个数据类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
env.addSource(new ClickSource())
.assignAscendingTimestamps(_.timeStamp)
// 通过为每条数据分配相同的key,来将数据发送到同一个分区
.keyBy(_ => "key")
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
.aggregate(new AvgPv)
env.execute()
class AvgPv extends AggregateFunction[Event,(Set[String], Double), Double] {
// 创建空累加器,类型是元组,元组的第一个元素类型为Set数据结构,用来对用户名去重
// 第二个元素用来累加pv操作,也就是没来一条数据就加一
override def createAccumulator(): (Set[String], Double) = (Set[String](), 0L)
// 累加规则
override def add(in: Event, acc: (Set[String], Double)): (Set[String], Double) = {
(acc._1+in.user, acc._2+1)
}
// 获取窗口关闭时向下游发送的结果
override def getResult(acc: (Set[String], Double)): Double = {
acc._2/(acc._1.size.toDouble)
}
// merge方法只有在事件时间的会话窗口时,才需要实现,这里无需实现
override def merge(acc: (Set[String], Double), acc1: (Set[String], Double)): (Set[String], Double) = ???
}
全窗口函数(Full Window Functions)
窗口操作中的另一大类就是全窗口函数,与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
Flink中的全窗口函数有两种:WindowFunction和ProcessWindowFunction。
// 窗口函数
stream.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
处理窗口函数ProcessWindowFunction是Window API中最底层的通用窗口函数接口。除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象(context)”。这个上下文对象不仅有窗口信息,还可以访问当前的时间和状态信息。这里的时间包括了处理时间(process time)和事件时间水位线(event time watermark)。这使得ProcessWindowFunction更加灵活、功能更加丰富,可以认为是一个增强版的WindowFunction。
// Full WindowFunction
env.addSource(new ClickSource())
.assignAscendingTimestamps(_.timeStamp)
// 为所有数据都指定同一个key,可以将所有数据发送到同一个分区
.keyBy(_ => "key")
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new UvCountByWindow)
.print()
env.execute()
// 自定义窗口处理函数
class UvCountByWindow extends ProcessWindowFunction[Event, String, String, TimeWindow]{
//
override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
// 初始化一个Set数据结构,用来对用户名进行去重
var userSet = Set[String]()
// 将所有用户进行去重
elements.foreach(userSet += _.user)
// 结合窗口信息,包装输出内容
val windowStart = context.window.getStart
val windowEnd = context.window.getEnd
out.collect(" 窗口:"+ new Timestamp(windowStart) + " ~ "+ new Timestamp(windowEnd) + " 独立访客数为:" + userSet.size)
}
增量和聚合函数结合使用
增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作,窗口计算更加灵活,功能更加强大。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数
据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。
// 全窗口函数和聚合函数结合使用
env.addSource(new ClickSource())
.assignAscendingTimestamps(_.timeStamp)
// 使用url作为key对数据进行分区
.keyBy(_.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
// 注意这里调用的是aggregate方法
// 增量聚合函数和全窗口聚合函数结合使用
.aggregate(new UrlViewCountAgg, new UrlViewCountResult)
.print()
class UrlViewCountAgg extends AggregateFunction[Event, Long, Long] {
override def createAccumulator(): Long = 0L
// 每来一个事件就加1
override def add(in: Event, acc: Long): Long = acc + 1L
// 窗口闭合时发送的计算结果
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = ???
}
case class UrlViewCount(url: String, count: Long, windowStart: Long, windowEnd: Long)
class UrlViewCountResult extends ProcessWindowFunction[Long, UrlViewCount, String, TimeWindow] {
// 迭代器中只有一个元素,是增量聚合函数在窗口闭合时发送过来的计算结果
override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
out.collect(UrlViewCount(key, elements.iterator.next(), context.window.getStart, context.window.getEnd))
}
}
其它API
- 触发器(Trigger):触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程;Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现;
- 移除器(Evictor):移除器主要用来定义移除某些数据的逻辑;
- 允许延迟(Allowed Lateness):为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算;
- 将迟到的数据放入侧输出流:Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些本该被丢弃的数据。
窗口的生命周期
熟悉了窗口 API 的使用,这里梳理一下窗口本身的生命周期,这也是对窗口所有操作的一个总结:
- 窗口的创建;
- 窗口计算的触发;
-
窗口的销毁;
迟到数据的处理
所谓的“迟到数据”(late data),是指某个水位线之后到来的数据,它的时间戳其实是在水位线之前的。所以只有在事件时间语义下,讨论迟到数据的处理才是有意义的。
- 设置水位线延迟时间;
- 允许窗口处理迟到数据;
- 将迟到数据放入窗口侧输出流;
package com.whu.chapter06
import com.whu.chapter05.{ClickSource, Event}
import com.whu.chapter06.WindowFunctionDemo.{UrlViewCountAgg, UrlViewCountResult}
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.sql.Timestamp
object ProcessLateDataDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 为方便测试,读取socket文本流进行处理
val stream = env.socketTextStream("localhost", 7777)
.map(data => {
val fields = data.split(",")
Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
})
// 方式1:设置Watermark延迟时间 2秒钟
val res1 = stream.assignTimestampsAndWatermarks(WatermarkStrategy
// 最大延迟时间设置为5秒钟
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(2))
.withTimestampAssigner( new SerializableTimestampAssigner[Event] {
override def extractTimestamp(t: Event, l: Long): Long = t.timeStamp
})
)
// 定义侧输出流标签
val outputTag = OutputTag[Event]("late")
val res2 = stream
.keyBy(_.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 方式二:允许窗口处理迟到数据,设置1分钟的等待时间
.allowedLateness(Time.minutes(1))
// 方式三:将最后的迟到数据输出到侧输出流
.sideOutputLateData(outputTag)
.aggregate(new UrlViewCountAgg, new UrlViewCountResult)
res2.print()
res2.getSideOutput(outputTag).print("late")
// 为方便观察,可以将原始数据也输出
stream.print("input")
}
}