第五章 DataStream API (基础篇)

一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上由以下几部分构成:

  • 获取执行环境(Execution Environment);
  • 读取数据源(Source);
  • 定义基于数据的转换操作(Transformations);
  • 定义计算结果的输出位置(Sink);
  • 触发执行程序;


5.1 执行环境

5.1.1 创建执行环境

创建执行环境,通过调用StreamExecutionEnviroment类的的静态方法。具体有三种:

  • StreamExecutionEnvironment.getExecutionEnvironment,它会根据当前运行的上下文
    直接得到正确的结果;也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的
    运行环境;
  • StreamExecutionEnvironment.createLocalEnvironment, 这个方法返回一个本地执行环境;
  • StreamExecutionEnvironment.createRemoteEnvironment, 这个方法返回集群执行环境,调用时需要指定JobManager的主机号和端口号,并指定要运行的jar包;

5.1.2 执行模式

  • 流执行模式(streaming);
  • 批执行模式(batch),有两种方式进行配置:
    • 命令行配置:bin/flink run -Dexecution.runtime-mode=BATCH ...;
    • 代码中进行配置:env.setRuntimeMode(RuntimeExcutionMode.BATCH);
  • 自动模式(automatic),在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

5.2 数据源算子(SOURCE)

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源,而读取数据的算子就是源算子(Source)。因此,Source就是整个处理程序的输入端。

Flink有多种读取源数据的方式:

// 定义一个模拟的用户行为样例类
case class Event(user:String, url:String, timestamp:Long)

// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 1、从集合读取数据
val clicks = List(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))
val stream1 = env.fromColletctions(clicks)
// 也可以直接将元素列举出来通过fromElements进行读取数据
val stream1 = env.fromElements(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))

// 2、从文件读取数据:可以是目录/文件,可以是hdfs文件,也可以是本地文件
val stream2 = env.readTextFile("clicks.csv")

// 3、从socket读取数据
val stream3 = env.socketTextStream("localhost", 777)

// 4、从kafka读取数据。需要添加依赖 连接工具 flink-connector-kafka
// 创建 FlinkKafkaConsumer 时需要传入三个参数:
// (1) topic,定义了从哪些主题中读取数据;
// (2) 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema, 反序列化方式;
// (3) Properties 对象,设置了 Kafka 客户端的一些属性;
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
// 创建kafka相关配置
val properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//创建一个 FlinkKafkaConsumer 对象,传入必要参数,从 Kafka 中读取数据
val stream = env.addSource(new FlinkKafkaConsumer[String](
  "clicks",
  new SimpleStringSchema(),
  properties
))

上面介绍的是直接通过API读取数据源。另一种比较复杂的方式是自定义数据源,然后通过env.addSource进行读取。

自定义数据源需要实现SourceFunction接口。主要需要重写两个关键方法:

  • run()方法,使用运行时上下文对象(SourceContext)向下游发送数据;
  • cancel()方法,通过标识位控制退出循环,来达到中断数据源的效果;
package com.whu.chapter05

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

import java.util.Calendar
import scala.util.Random


// 调用
// val stream = env.addSource(new ClickSource)


case class Event(user: String, url: String, timestamp: Long)

// 实现 SourceFunction 接口,接口中的泛型是自定义数据源中的类型
class ClickSource(sleepTime:Long=1000L) extends SourceFunction[Event] {
  // 标志位,用来控制循环的退出
  var running = true

  // 重写run方法,使用上下文对象sourceContext调用collect方法
  override def run(ctx: SourceContext[Event]): Unit = {
    // 实例化一个随机数发生器
    val random = new Random()
    // 供随机选择的用户名数组
    val users = Array("Marry", "Bob", "Jack", "Cary")
    // 供选择的url数组
    val urls = Array("./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2")

    // 通过while循环发送数据,running默认为true,所以会一直发送数据
    while (running) {
      // 调用collect方法向下游发送数据
      ctx.collect(Event(
        users(random.nextInt(users.length)),
        urls(random.nextInt(urls.length)),
        Calendar.getInstance.getTimeInMillis // 当前时间戳
      ))
      // 每隔一秒生成一个点击事件,方便观测
      Thread.sleep(sleepTime)
    }
  }

  override def cancel(): Unit = {
    // 通过将running设置为false来终止数据发送
    running = false
  }
}

5.3 转换算子(Transformation)

数据源读入数据之后,我们就可以使用各种转换算子,讲一个或多个DataStream转换为新的DataStream。

5.3.1 基本转换算子

  • map, 一个个进行数据转换;
  • filter, 对数据进行过滤;
  • flatmap, 扁平映射,可以理解为先map然后进行flatten;

5.3.2 聚合算子(Aggregation)

  • keyBy, 按键分区。对于Flink来说,DataStream是没有直接进行觉得API的。要做聚合需要先进行分区,这个操作就是通过keyBy来完成的。keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合。对于 POJO 类型或 Scala 的样例类,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。
  • 简单聚合,sum、min、max、minBy、maxBy等。都是在指定字段上进行聚合操作。min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据。

指定字段的方式有两种:指定位置,和指定名称。元组通过位置,样例类通过字段名称。

keyBy得到的数据流一般称为KeyedStream。而聚合操作则会将KeyedStream转换为DataStream。

规约聚合(reduce)

与简单聚合类似,reduce操作也会将KeyedStream转换为DataStream。他不会改变流的元素数据类型,输入输出是一致的。

reduce方法来自ReduceFunction接口,该方法接收两个输入事件,经过处理后输出一个相同数据类型的事件。

一个简单的栗子:

import org.apache.flink.streaming.api.scala._

object TransformationDemo {
  def main(args:Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 添加自定义数据源
    env.addSource(new ClickSource)
      .map(r => (r.user, 1L))
      // 按照用户进行分组
      .keyBy(_._1)
      // 计算每个用户的访问频次
      .reduce((r1, r2) => (r1._1, r1._2+r2._2))
      // 将所有数据分到同一个分区
      .keyBy(_ => true)
      // 通过reduce实现max功能,计算访问频次最高的用户
      .reduce((r1, r2)=> if(r1._2>r2._2) r1 else r2)
      .print()
    
    // 更简单的方法是直接keyBy然后sum然后maxBy就行了,这里只是为了演示reduce用法
    env.execute()
  }
}

5.3.3 用户自定义函数(UDF)

Flink的DataStream API编程风格其实是一致的:基本都是基于DataStream调用一个方法,表示要做一个转换操作;方法需要传入一个参数,这个参数都是需要实现一个接口。

这个接口有一个共同特定:全部都以算子操作名称 + Function命名,如数据源算子需要实现SourceFunction接口,map算子需要实现MapFunction接口。我们可以通过三种方式来实现接口。这就是所谓的用户自定义函数(UDF)。

  • 自定义函数类;
  • 匿名类;
  • lambda表达式;

接下来对这三种编程方式做一个梳理。

函数类(Function Classes)
package com.whu.chapter05

import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._

object TransformationUDFDemo {
 def main(args:Array[String]): Unit = {

   // 自定义filterFunction类, 并接受额外的参数
   class MyFilter(key:String) extends FilterFunction[Event] {
     override def filter(t: Event): Boolean = {
       t.url.contains(key)
     }
   }

   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)

   // 通过自定义函数类
   val stream1 = env.addSource(new ClickSource)
     .filter(new MyFilter("home"))

   // 通过匿名类
   val stream2 = env.addSource(new ClickSource)
     .filter(new FilterFunction[Event]{
       override def filter(t: Event): Boolean = {
         t.url.contains("home")
       }
     })

   // 最简单的lambda 表达式
   val stream3 = env.addSource(new ClickSource)
     .filter(_.url.contains("home"))
   
   stream1.print("stream1")
   stream2.print("stream2")
   stream3.print("stream3")
   
   env.execute()
 }
}
富函数类(Rich Function Classes)

富函数类也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是已抽象类的形式出现的。例如:RichMapFunction,RichFilterFunction,RichReduceFunction等。

与常规函数类的不同主要在于富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

典型的生命周期方法有:

  • open方法,是RichFunction的初始化方法,会开启一个算子的生命周期。当一个算子的实际工作方法如map、filter等方法被调用之前,open会首先被调用。所以像文件IO流、数据库连接、配置文件读取等等这样一次性的工作,都适合在open方法中完成;
  • close方法,是生命周期中最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。

open、close等生命周期方法对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,如map,对于每一条数据都会调用一次。

package com.whu.chapter05

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._

object RichFunctionDemo {
  def main(args:Array[String]) : Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)

    env.addSource(new ClickSource(10000))
      .map(new RichMapFunction[Event, Long] {
        // 在任务生命周期开始时会执行open方法,在控制台打印对应语句
        override def open(parameters: Configuration): Unit = {
          println(s"索引为 ${getRuntimeContext.getIndexOfThisSubtask} 的任务开始")
        }
        override def map(in: Event): Long = {
          in.timeStamp
        }

        override def close(): Unit = {
          println(s"索引为 ${getRuntimeContext.getIndexOfThisSubtask} 的任务结束")
        }
      }).print()
    
    env.execute()
  }
}

在上面的例子中可以看到,富函数类提供了getRuntimeContex方法,可以获取运行时上下文信息,如程序执行的并行度,任务名称,任务状态等。

5.3.4 物理分区(Physical Partitioning)

分区(partitioning)操作就是要将数据进行重新分布,传递到不同的流分区去进行下一步计算。keyBy是一种逻辑分区(logic partitioning)操作。

Flink 对于经过转换操作之后的 DataStream,提供了一系列的底层操作算子,能够帮我们实现数据流的手动重分区。为了同 keyBy()相区别,我们把这些操作统称为“物理分区”操作。

常见的物理分区策略有随机分区、轮询分区、重缩放和广播,还有一种特殊的分区策略— —全局分区,并且 Flink 还支持用户自定义分区策略,下边我们分别来做了解。

随机分区(shuffle)

最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的 shuffle()方法,将数据随机地分配到下游算子的并行任务中去。

随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。

轮询分区(Round-Robin)

轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用 DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance()使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。


重缩放分区(rescale)

重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,也就是说,“发牌人”如果有多个,那么 rebalance()的方式是每个发牌人都面向所有人发牌;而rescale()的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。



当下游任务(数据接收方)的数量是上游任务(数据发送方)数量的整数倍时,rescale()的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。

广播(broadcast)

这种方式其实不应该叫作“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

全局分区(global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

自定义分区

当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略。
在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector 接口。

栗子:

package com.whu.chapter05

import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala._

object PartitioningDemo {
  def main(args:Array[String]) : Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 读取数据源
    val stream = env.addSource(new ClickSource())

    // 随机分区(shuffle)
    stream.shuffle.print("shuffle").setParallelism(4)

    // 轮询分区(rebalance, Round-Robin)
    stream.rebalance.print("rebalance").setParallelism(4)

    // 重缩放分区(rescale)
    stream.rescale.print("rescale").setParallelism(4)

    // 广播 (broadcast)
    stream.broadcast.print("broadcast").setParallelism(4)

    // 全局分区(global)
    stream.global.print("global").setParallelism(4)

    // 自定义分区
    stream.partitionCustom(new Partitioner[Event] {
      // 根据 key 的奇偶性计算出数据将被发送到哪个分区
      override def partition(k: Event, i: Int): Int = {
        k.timeStamp.toInt % 2
      }
    }, "user"
    ).print()
    
    env.execute()
  }
}

5.4 输出算子(Sink)

5.4.1 连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个Sink算子,主要就是用来实现与外部系统链接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

与addSource类似,addSink也支持自定义sink算子SinkFunction。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。Flink官方提供了诸多第三方系统连接器:


除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一
些其他第三方系统与 Flink 的连接器:


5.4.2 输出到文件

Flink有一些非常简单粗暴的输出到文件的预实现方法,如writeAsCsv等,目前这些简单的方法已经要被弃用。

Flink专门提供了一个流式文件系统连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且继承了Flink的检查点机制,用来确保精确一次(exactly)的一致性语义。

StreamingFileSink支持行编码(row-encoded)和批量编码(bulk-encoded,比如parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法如下:

  • 行编码:StreamingFileSink.forRowFormat (basePath, rowEncoder);
  • 批量编码:StreamingFileSink.forBulkFormat (basePath,bulkWriterFactory);

在创建行或批量Sink时,我们需要传入两个参数,用来指定存储桶的基本路径和数据的编码逻辑。

package com.whu.chapter05

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.streaming.api.scala._
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy

import java.util.concurrent.TimeUnit


object SinkToFileDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    val stream = env.addSource(new ClickSource())
    
    val fileSink = StreamingFileSink.forRowFormat(
      new Path("./output"),
      new SimpleStringEncoder[String]("UTF-8")
    )
      // 通过.withRollingPolicy()方法指定滚动逻辑
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          .withMaxPartSize(1024*1024*1024)
          .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
          .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
          .build()
      ).build()
    
    stream.map(_.toString).addSink(fileSink)
  }
}

上面创建了一个简单的文件 Sink,通过 withRollingPolicy()方法指定了一个“滚动策略”。上面的代码设置了在以下 3 种情况下,我们就会滚动分区文件:

  • 至少包含 15 分钟的数据;
  • 最近 5 分钟没有收到新的数据;
  • 文件大小已达到1GB;

输出到其他系统

略。

参考:
FLink教程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 193,968评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,682评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,254评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,074评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,964评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,055评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,484评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,170评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,433评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,512评论 2 308
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,296评论 1 325
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,184评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,545评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,150评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,437评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,630评论 2 335

推荐阅读更多精彩内容