Flink-Streaming-overview

Flink中的流应用就是在数据流上应用各种转化(如:filter,update state,difine window,aggregation)。数据流有各种数据源创建而来(如:消息队列,socket流,文件等)。结果输出到sink,如写入文件或者标准输出。Flink程序可以在多种上下文中运行,standalone,内置在其他应用中等。应用可以在本地JVM中执行,也可以在集群的许多机器中执行。

示例程序


下面的程序是一个完成的应用,它演示了如何在web soscke上使用window统计5秒内的字数。你可以复制代码然后在你本地运行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

运行应用前,先使用 netcat 在命令行中开启输入流:

nc -lk 9999

输入一些单词就会返回新的结果。这些单词或作为字数统计应用的输入。如果你想看到统计值大于1,可以在5秒内一遍又一遍的输入相同的单词(如果你做不到,可以增加window的大小)

数据源 Data Source


Source指的是你的程序从哪里读取它的输入。你可以使用 StreamExecutionEnvironment.addSource(sourceFunction)在你的程序中添加数据源。Flink自带了一些实现好的数据源函数,淡然你可以实现 SourceFunction 来实现自定义的非并行的source或者实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来实现并行的source。
StreamExecutionEnvironment有一些实现定义好的数据源方法:

基于文件的数据源:

  • readTextFile(path) - 读取text文件。也就是说使用 TextInputFormat 一行一行的读取数据。
  • readFile(fileInputFormat,path) - 使用给定的 input format读取文件
  • readFile(fileInputFormat,path,watchType,interval,pathFilter,typeInfo) - 这个方法在flink内部,被上面的两个方法所调用。它使用给定的fileInputFomat读取path中的文件。根据 watchType 的值,数据源会定期(interval 毫秒)监控path中的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者仅对当前path下的文件进行一次处理,然后退出(FileProcessingMode.PROCESS_ONCE)。使用 pathFilter ,用户可以排除不想要处理的文件。
    实现:
    在内部,Flink将读取文件分为两个子任务,分别叫做 目录监控 与 数据读取。每个任务都是单独运行的。目录监控是一个单线程的任务,而数据读取任务可以是多线程的并发任务。数据读取任务的并发度取决于job的并发度。目录监控的功能在于定期监控目录,发现需要被处理的文件,将它们分片 split 然后指定分片给下游的reader。reader会进行实际读取数据的操作。每一个split仅会被一个reader读取,而一个reader可能会读取多个split(依次读取)。
    重要说明:
    1.如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当文件被修改后,它的内容会被全部重新进行处理。这会破坏“精确一次”的语义,因为向文件中追加数据,会导致整个文件进行重新处理。
    2.如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source只会扫描path一次然后退出,而不会等到reader读取所有数据完毕后再退出。当然,reader会继续进行数据读取,直到所有文件内容都读取完毕。关闭source会导致之后不会再有checkpoint。这将导致故障恢复时,需要等待更长的时间,因为job会从上次checkpoint处会进行重新读取数据。

基于socket:

  • socketTextStream - 从socket读取数据。数据可以被 分隔符delimiter 分隔开

基于集合:

  • fromCollection(Collection) - 从java集合中创造数据流。所有集合中的数据必须是同样的类型
  • fromCollection(Iterator,Class) - 从iterator中创造数据流。class参数指定了iterator返回的数据的类型
  • fromElements(T ...) - 从给定的对象序列中创造数据流。所有对象必须是相同的类型
  • fromParallelCollection(SplitableIterator , Class) - 从iterator中并行的创造数据流。class参数指定了iterator返回的数据的类型
  • generateSequence(from,to) - 使用给定的interval并行的生成数字序列

自定义:

  • addSource - 使用source function。如,从Kafka中读取数据,你可以使用 addSource(new FlinkKafkaConsumer08<>(...)).

DataStream Transformations


查阅 operator 文档

Data Sink


Data Sink读取数据流,并将它们写入到file,socket,其他系统或者打印它们。Flink自带了一些output format,它们被封装到一些操作符中:

  • writeAsText() / TextOutputFormat - 将数据作为一整行string,写入文件。通过调用数据的toString方法
  • writeAsCsv(...) / CsvOutputFormat - 将 tuple 以逗号分隔,写入文件。行与行以及field之间的分隔符可以自定义。每一个field的值,是通过调用toString方法获取的
  • pring() / pringToErr() - 将数据的toString方法的值打印到口红纸条。可以选择前缀,在打印输出内容前先打印前缀。这能够区分不同的print的内容。如果并发度大于1,输出同样会打印一个task的标识符。
  • writeUsingOutputFormat() / FileOutputFormat - 使用自定义文件输出的基类与方法。支持自定义的 对象-字节 的转化。
  • writeToSocket - 根据 SerializationSchema 将数据写入socket
  • addSink - 调用传入的自定义 sink function。Flink通过实现 sink function可以与其他系统连接起来(如kafka)

注意的是 write*() 方法主要用于调试的目的。它们没有参与flink的checkpoint过程,这就意味着使用这些函数是“at-least-once”至少一次语义。数据如何写入目标系统是由OutputFormat决定的,也就是说发送到OutputFormat的数据并不一定会立即写入目标系统(如批量写入情况)。因此,在遇到故障时,这些数据有可能会丢失。
为了稳定地,精确一致的将流数据写入问加你系统,建议使用 flink-connector-filesystem。当然,如果自定义了sink function,通过 addSink 添加该自定义的sink,也可以参与flink的checkpoint过程,保持 exactly-once 语义。

Iterator


迭代流程序实现了step function,并且内置在 IterativeStream中。由于DataStream程序可能不会停止,因此iteration中不会有最大数量限制。你需要定义流中的哪些数据需要继续迭代,哪些数据可以发送到下游的操作符,你可以使用split或者filter实现。下面我们使用 filter 来演示。首先,我们定义一个 IterativeStream :

IterativeStream<Integer> iteration = input.iterate();

然后,我们定义在循环中,需要对数据流做哪些操作(下面我们就简单的使用map作为演示)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

为了定义迭代器何时关闭,可以调用 IterativeStream 的 closeWith(feedbackStream) 方法。传入 closeWith() 的数据流会再次进入迭代器,放到迭代器的head。一个常用的模式是,使用filter将流的一部分重新放入迭代器,而另一部分下发到下游操作符这些filter可以定义“终止”的逻辑,也就是一个数据可以不再进入迭代器,而是被转发到下游操作符。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

例如,下面的程序就是对数据进行减1操作,直到为0:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

Execution Parameter 执行参数


StreamExecutionEnvironment 包括 ExecutionConfig ,它允许设置运行时所需的job配置。
请参阅 execution configuration 获取更多参数的解释。下面的参数仅属于 DataStream API:

  • setAutoWatermarkInterval(long milliseconds) : 设置 watermark 发射的间隔。你可以公共 long getAutoWatermartkInterval() 获取当前的值。

故障容忍

查阅 State & Checkpointing

控制延迟


默认情况下,数据在网络间传输时,并不是一个一个的传输(造成不必要的网络拥堵),而是缓存后一起传输。buffer的大小可以在Flink 的配置文件中配置。尽管这种方式可以优化吞吐率,但是当输入流的速度不够快时,会造成延迟问题。为了平衡吞吐率和延迟,你可以使用 env.setBufferTimeout(timeoutMillis) 来设置最大等待时间。超过这个时间后,即便buffer没有填满,也要发出去。默认值为100ms。

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

为了最大化吞吐率,设置 setBufferTimeout(-1) 会移除超时设置,仅当buffer填满后才发送。为了最小化延迟,设置超时的值接近0(如 5 或 10 毫秒)。应该避免设置值为0,因为这会导致服务性能下降。

调试 Debugging

在提交任务到分布式集群运行前,最好确认程序可以按预期运行。因此,实现一个数据分析应用,通常是一个增量的过程:检查结果,调试,优化。
Flink提供了本地IDE调试的功能,简化了数据分析应用的开发。包括加载测试数据,收集结果数据。这一部分会显示如何简化flink程序的开发,便于测试调试程序。

本地运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();
加载测试数据
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

注意:需要提供数据烈性,iterator要实现 Serializable。不能并发执行

迭代Sink
import org.apache.flink.streaming.experimental.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,738评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,377评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,774评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,032评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,015评论 5 361
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,239评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,724评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,374评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,508评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,410评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,457评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,132评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,733评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,804评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,022评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,515评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,116评论 2 341

推荐阅读更多精彩内容