原文链接:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html
Flink中的DataStream程序是在数据流中实现transformation操作(如:过滤、修改状态、定义窗口、聚合等)的常规程序。数据流通过各种source(如: 消息队列、socket流、文件等)来创建,结果通过sink返回,可能是将数据写入文件中或者标准输出(如:命令行终端输出)。Flink程序可以在不同的情况下执行,以独立的程序执行或者嵌入其他程序中执行。执行过程可以发生在本地JVM中,或者在多个机器组成的集群中。
请参考基本概念: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html,来获取关于Flink API的基本概念的介绍。
为了创建你自己的Flink DataStream程序,我们鼓励你以Flink 程序剖析的结构开始,并将你自己的transformation添加进去。接下来的部分作为额外的操作和高级特性的参考。
编程案例(Example Program)
接下来的程序是一个完整的,流式窗口单词计数的应用案例,单词的计数来自web socket中每5分钟窗口的单词。你可以把这些代码拷贝到你本地去执行:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print
env.execute("Window Stream WordCount")
}
}
为了执行这个例子,你首先要在终端窗口中执行netcat命令来产生输入数据:
nc -lk 9999
只需要输入一些单词然后按下回车键,这些单词就会输入到word count程序中。如果你想看的计数大于1的,只需要在5秒内输入同一个单词即可(如果你不能再5秒内输入同一个单词的话,可以调整一下窗口的大小)。
DataStream的Transformation操作
数据的transformation将一个或者多个DataStream转换成一个或者多个新的DataStream,程序可以将多个transformation操作组合成一个复杂的拓扑结构。
这一章节给出了所有可用的transformation的一个描述:
Transformation操作 描述
Map
DataStream → DataStream: 输入一个参数产生一个参数,map的功能是对输入的参数进行double操作:
dataStream.map { x => x * 2 }
FlatMap
DataStream → DataStream: 输入一个参数,产生0个、1个或者多个输出. 这个 flatmap 的功能是将句子中的单词拆分出来:
dataStream.flatMap { str => str.split(" ") }
Filter
DataStream → DataStream: 结算每个元素的布尔值,并返回布尔值为true的元素. 下面这个例子是过滤出非0的元素:
dataStream.filter { _ != 0 }
KeyBy
DataStream → KeyedStream: 逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素. 在内部是以hash的形式实现的. 请参考: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#specifying-keys来了解如何指定key. 这个操作返回的是一个 KeyedDataStream
.
dataStream.keyBy("someKey") // 通过 "someKey"进行分组
dataStream.keyBy(0) // 通过Tuple的第一个元素进行分组
注意:以下类型是无法作为key的:
1、 POJO类,但是没有重写hashCode()函数并且依赖Object的hashCode()实现
2、 任意形式的数组类型
Reduce
KeyedStream → DataStream: 一个分组数据流的滚动规约操作. 合并当前的元素和上次规约的结果,产生一个新的值.
下面是一个创建部分流的和的reduce函数:
keyedStream.reduce { _ + _ }
Fold
KeyedStream → DataStream: 一个有初始值的分组数据流的滚动折叠操作. 合并当前元素和前一次折叠操作的结果,并产生一个新的值.
下面的fold函数就是当我们输入一个 (1,2,3,4,5)的序列, 将会产生一下面的句子:"start-1", "start-1-2", "start-1-2-3", ...
val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
Aggregations
KeyedStream → DataStream: 分组数据流上的滚动聚合操作. min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy)
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window
KeyedStream → WindowedStream: Windows 是在一个分区的 KeyedStreams中定义的. Windows 根据某些特性将每个key的数据进行分组 (例如:在5秒内到达的数据). 参考: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html获取window的描述
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) //最近5分钟的数据
WindowAll
DataStream → AllWindowedStream: Windows 可以在一个常规的 DataStreams中定义. Windows 根据某些特性对所有的流 (例如:5秒内到达数数据). 参考: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html获取window的描述.
注意: 这个操作在许多情况下并非并行操作. 所有的记录都会聚集到一个windowAll操作的任务中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // 最近5分钟的数据
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream 将一个通用函数作为一个整体传给window. 下面是一个手动对window中的元素进行求和的函数.
Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce
WindowedStream → DataStream 给window赋一个reduce功能的函数,并返回一个规约的结果.
windowedStream.reduce { _ + _ }
Window Fold
WindowedStream → DataStream 给窗口赋一个fold功能的函数,并返回一个fold后的结果. 对于这个函数,当我们传入 (1,2,3,4,5)这个序列时, 将会得到如下的结果: "start-1-2-3-4-5":
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windows
WindowedStream → DataStream 对window的元素做聚合操作. min和 minBy的区别是min返回的是最小值,而minBy返回的是包含最小值字段的元素。(同样的原理适用于 max 和 maxBy).
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union
DataStream → DataStream 对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream.注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次.
dataStream.union(otherStream1, otherStream2, ...)
Window Join
DataStream,DataStream → DataStream 根据一个给定的key和window对两个DataStream做join操作.
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
Window CoGroup
DataStream,DataStream → DataStream 根据一个给定的key和window对两个DataStream做Cogroups操作.
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
Connect
DataStream,DataStream → ConnectedStreams: 连接两个保持他们类型的数据流.
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap
ConnectedStreams → DataStream 作用于connected 数据流上,功能与map和flatMap一样
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
Split
DataStream → SplitStream: 根据某些特征把一个DataStream拆分成两个或者多个DataStream.
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
Select
SplitStream → DataStream: 从一个SplitStream中获取一个或者多个DataStream.
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
Iterate
DataStream → IterativeStream → DataStream: 在流程中创建一个反馈循环,将一个操作的输出重定向到之前的操作中. 这对于定义持续更新模型的算法来说是很有意义的. 下面的代码从一个stream开始,不断的应用迭代体. 大于0的元素又被发送回到渠道中, 其余的元素则被转发到下游. 请参考: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#iterations来获取迭代的完整描述.
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
Extract Timestamps
DataStream → DataStream 提取记录中的时间戳来跟需要事件时间的window一起发挥作用. 更多详情参考: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html.
stream.assignTimestamps { timestampExtractor }
通过匿名模式匹配从tuple、case class和集合中提取信息,如下:
val data: DataStream[(Int, String, Double)] = // [...]
data.map {
case (id, name, temperature) => // [...]
}
API不支持开箱即用,如果你需要这些特性,你需要使用Scala API的扩展
下面的transformation操作可以应用于DataStream的Tuple中:
Transformation | Description |
---|---|
Project DataStream → DataStream |
获取Tuple中的子集:DataStream<Tuple3<Integer, Double, String>> in = // [...] DataStream<Tuple2<String, Integer>> out = in.project(2,0);
|
物理分区(Physical partitioning)
Transformation操作 | 描述 |
---|---|
Custom partitioning DataStream → DataStream |
使用用户自定义的分区来为每一个元素选择具体的task.dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0)
|
Random partitioning DataStream → DataStream |
按均匀分布随机划分元素.dataStream.shuffle()
|
Rebalancing (Round-robin partitioning) DataStream → DataStream |
循环的为元素分区,为每一个分区创建相等的负载,这在数据倾斜的优化上是非常有用的:dataStream.rebalance()
|
Rescaling DataStream → DataStream |
重复的分区元素到下游操作的子集中. 如果你想将一个source的并行实例 拆分到多个mapper操作的子集中来进行分布式加载, 而又不希望调用rebalance()产生的全量重分区的话, 这个方法是很有用的。 这个函数只会根据其他配置参数如TaskManagers的slot数, 来进行本地的数据传输而不是在网络中进行传输. |
Flink也通过下面的方法为transformation提供确切的流分区底层控制:
Transformation操作 | 描述 |
---|---|
Custom partitioning DataStream → DataStream |
使用用户自定义的分区来为每一个元素选择具体的task.dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0)
|
Random partitioning DataStream → DataStream |
按均匀分布随机划分元素.dataStream.shuffle()
|
Rebalancing (Round-robin partitioning) DataStream → DataStream |
循环的为元素分区,为每一个分区创建相等的负载,这在数据倾斜的优化上是非常有用的:dataStream.rebalance()
|
Rescaling DataStream → DataStream |
重复的分区元素到下游操作的子集中. 如果你想将一个source的并行实例 拆分到多个mapper操作的子集中来进行分布式加载, 而又不希望调用rebalance()产生的全量重分区的话, 这个方法是很有用的。 这个函数只会根据其他配置参数如TaskManagers的slot数, 来进行本地的数据传输而不是在网络中进行传输. |
上游操作传递元素给的下游操作的子集数目依赖于下游操作和上游操作的并发度. 例如,如果上游操作有2个并发,而下游操作有4个并发,那么上游的一个并发结果分配给下游的两个并发操作,另外的一个并发结果分配给了下游的另外两个并发操作.另一方面,下游有两个并发操作而上游又4个并发操作,那么上游的其中两个操作的结果分配给下游的一个并发操作而另外两个并发操作的额结果则分配给另外一个并发操作.
当并发度与其他的一个或者多个下游操作不同时,上游操作将产生不同数目的输出.
请看这个关于上述例子的连接图表示的图:
dataStream.rescale()
Broadcasting
DataStream → DataStream 将元素广播到每个分区上.
dataStream.broadcast()
任务链和资源组(Task chaining and resource group)
链接两个连续的transformation操作,意味着将这两个操作放在同一个线程中执行来获得更好的性能。Flink默认情况下,会尽可能的将操作链接在一起(例如:两个连续的map操作),如果需要的话,Flink提供了细粒度的链接控制:
如果你想在你的整个作业中禁用链接操作的话,可以使用StreamExecutionEnvironment.disableOperatorChaining()
。通过下面的方法可以获得更多关于链接控制的例子。因为这些操作依赖前一次transformation,所以只能用在DataStream的transformation操作之后,例如 你可以这么使用someStreaam.map(…).startNewChain()
,但是你不能这么使用someStream.startNewChain()
。
在Flink中一个资源组就是一个slot,详情参考: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#configuring-taskmanager-processing-slots, 如果需要的话,你可以手动的拆分不同的操作到不同的slot中去。
Transformation操作 | 描述 |
---|---|
Start new chain | 从这个操作开始,新启一个新的chain. 两个map操作将链接在一起,而filter将不再和第一个map链接在一起: someStream.filter(...).map(...).startNewChain().map(...)
|
Disable chaining | Map禁用链接操作 |
Set slot sharing group | 设置操作的slot共享组. Flink将把slot共享组的操作放到同一个slot中,而非slot共享组的操作放到其他的slot中. 这个机制可以用来做slot隔离. 如果所有的输入操作都在同一个slot共享组中,那么新的slot共享组将继承自输入操作的slot共享组. 默认的slot共享组叫"default", 所有调用slotSharingGroup("default")的操作都会被放入这个共享组中.someStream.filter(...).slotSharingGroup("name")
|
数据源(Data Sources)
Source就是你程序读取input的地方,你可以通过调用StreamExecutionEnvironment.addSource(sourceFunction)
来添加一个Source到你的程序中,Flink提供了一些预定义的Source函数,但是你也可以通过实现SourceFunction
接口来实现非并行的Source或者实现ParalleSourceFunction
接口或者继承RichParalleSourceFunction
类来实现并行的source。
这里有几个可以通过StreamExecutionEnvironment
获取的预定义stream Source。
基于File的:
readTextFile(path)
--- 一列一列的读取遵循TextInputFormat规范的文本文件,并将结果作为String返回。
readFile(fileInputFormat, path)
--- 按照指定的文件格式读取文件
readFile(fileInputFormat, path, watchType, interval, pathFilter)
--- 这个方法会被前面两个方法在内部调用,它会根据给定的fileInputFormat来读取文件内容,如果watchType是FileProcessingModel.PROCESS_CONTINUOUSLY
的话,会周期性的读取文件中的新数据,而如果是FileProcessingModel.PROCESS_ONCE
的话,会一次读取文件中的所有数据并退出。使用pathFilter来进一步剔除处理中的文件。
基于Socket的
socketTextStream
---从Socket中读取信息,元素可以用分隔符分开。
基于集合(Collection)的
fromCollection(seq)
--- 从Java的java.util.Collection中创建一个数据流,集合中所有元素的类型是一致的。
fromCollection(Iterator)
--- 从迭代(Iterator)中创建一个数据流,指定元素数据类型的类由iterator返回
fromElements(elements:_*)
--- 从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的。
fromParalleCollection(SplitableIterator)
--- 从一个给定的迭代(iterator)中并行地创建一个数据流,指定元素数据类型的类由迭代(iterator)返回。
generateSequence(from, to)
--- 从给定的间隔中并行地产生一个数字序列。
自定义(Custom)
addSource
--- 附加一个新的数据源函数,例如:你可以使用addSource(new FlinkKafkaConsumer08<>(…))
来读取Kafka中的数据。请参考: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/来了解更多信息。
数据接收器(Data Sinks)
Data Sink 消费DataStream中的数据,并将它们转发到文件、套接字、外部系统或者打印出。Flink有许多封装在DataStream操作里的内置输出格式:
writeAsText()/TextOutputFormat
--- 将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取。
WriteAsCsv(…)/CsvOutputFormat
--- 将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法。
Print()/printToErr()
--- 打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。或者也可以在输出流中添加一个前缀,这个可以帮助区分不同的打印调用,如果并行度大于1,那么输出也会有一个标识由哪个任务产生的标志。
writeUsingOutputFormat()/FileOutputFormat
--- 自定义文件输出的方法和基类,支持自定义对象到字节的转换。
writeToSocket
--- 根据SerializationSchema 将元素写入到socket中
addSink
--- 调用自定义的接收器功能,Flink捆绑连接器(connectors)到外部系统中是通过sink 函数来实现的。
注意:DataStream中的write*()函数主要是调试用的,它们没有加入Flink的checkpoint机制,也就是说这些函数有至少一次(at-least-once)语义。数据流入到目标系统中取决于OutputFormat的实现,这就是说并非所有发送到OutputFormat中的元素都会立即展示在目标系统中,同时,失败的情况下,这些记录会丢失。
通过flink-connector-filesystem可以实现可靠的,仅执行一次地将流发布到文件系统中,通过addSink(…)
方法自定义实现也可以引入Flink 的checkpoint机制来实现exactly-once机制。
迭代(Iterations)
Iterative流程序实现一个阶梯函数,并将其嵌入到一个IterativeStream中,因为DataStream程序是不会结束的,所以没有最大迭代数这一说法。相反,你需要通过一个split操作或者filter操作指定流中的哪个部分需要反馈到迭代中而哪部分发到下游。这里列出了一个迭代,迭代体(重复计算的部分)仅仅是一个map操作,而返回给迭代的元素和下发给下游的元素通过filter来区分:
val iteratedStream = someDataStream.iterate(
iteration => {
val iterationBody = iteration.map(/* this is executed many times */)
(tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */))
})
默认情况下,反馈流的分区与迭代头的输入一致。用户可以在closeWith函数中设置一个可选的布尔标志来重写这个情况。例如:这有一个程序,从一个整数序列中不断的减1直到结果为0:
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
val iteratedStream = someIntegers.iterate(
iteration => {
val minusOne = iteration.map( v => v - 1)
val stillGreaterThanZero = minusOne.filter (_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
}
)
执行参数(Execution Parameters)
StreamExecutionEnvironment中包含了允许在运行时指定配置的ExecutionConfig对象。
请参考: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/execution_configuration.html来获取更多参数的描述,这些参数是针对DataStream API的:
enableTimestamps()/disableTimestamps()
:为Source发出的每个事件附加一个时间戳,areTimestampsEnadbled()
将返回当前设置的值
setAutoWatermarkInterval(long milliseconds)
:设置自动水印发布的时间间隔,你可以通过long getAutoWatermarkInterval()
方法来获取当前设置的值。
容错(Fault Tolerance)
State&Checkpoint: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/checkpointing.html描述了如何启用和配置Flink的checkpoint机制。
控制延迟(Controlling Latency)
默认情况下,流中的元素并不会一个一个的在网络中传输(这会导致不必要的网络流量消耗),而是缓存起来,缓存的大小可以在Flink的配置文件中配置。这个方法在优化吞吐量上是很好的,但是如果数据源输入不够快的话会导致数据延迟,为了控制吞吐量和延迟,你可以在运行环境中或者某个操作中使用env.setBufferTimeout(timeoutMills)
来为缓存填入设置一个最大等待时间。等待时间到了之后,即使缓存还未填满,缓存中的数据也会自动发送。 这个超时的默认值是100ms。
案例:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)
env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
为了最大吞吐量,可以设置setBufferTimeout(-1),这会移除timeout机制,缓存中的数据一满就会被发送。为了最小的延迟,可以将超时设置为接近0的数(例如5或者10ms)。 缓存的超时不要设置为0,因为设置为0会带来一些性能的损耗。
调试(Debugging)
在将一个流程序发布到分布式集群中执行之前,先确认实现的方法能按预期执行是个不错的想法。因此,实现数据分析往往是一个检查结果、调试、改进的增量过程。
Flink通过支持在IDE中的本地调试、注入测试数据和结果数据的采集来降低开发数据分析程序的难度。本章将给出如何缓解Flink开发难度的提示:
本地执行环境(Local Execution Environment)
一个LocalStreamEnvironment在创建它的同一个JVM进程中启动一个Flink系统,如果你在IDE中启动一个LocalEnvironment(本地执行环境)的话, 你就可以在你的代码中设置断点并轻松地调试你的代码了。
一个LocalEnvironment(本地指定环境)可以按如下方法来创建和使用:
val env = StreamExecutionEnvironment.createLocalEnvironment()
val lines = env.addSource(/* some source */)
// build your program
env.execute()
集合数据源(Collection Data Sources)
Flink提供了一些Java 集合支持的特殊数据源来使得测试更加容易,一旦程序测试成功后,将source和sink替换成从外部系统读或者写的source和sink 将会更加容易。
集合数据源可以按如下方法来使用:
val env = StreamExecutionEnvironment.createLocalEnvironment()
// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)
// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)
// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)
注意:目前,集合数据源要求数据类型和迭代要实现serializable接口,此外,集合数据源不能并发执行(即:并发度只能为1 ,parallelism = 1)
迭代器类型的Data Sink(Iterator Data Sink)
Flink也为测试提供类一个sink来收集DataStream的结果,可以通过下面的方法来使用:
import org.apache.flink.contrib.streaming.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter
val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala