1. 前言
1.1 说明
本文通过一个Demo程序,演示Flink从Kafka中读取数据,并将数据以JDBC的方式持久化到关系型数据库中。通过本文,可以学习如何自定义Flink Sink和Flink Steaming编程的步骤。
1.2 软件版本
- Centos 7.1
- JDK 1.8
- Flink 1.1.2
- Kafka 0.10.0.1
1.3 依赖jar包
请将以下依赖放在pom.xml中。这里使用的关系型数据是PostgreSQL,也可以换成其它关系型数据库的驱动程序。
<properties>
<flink.version>1.1.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.1-901-1.jdbc4</version>
</dependency>
</dependencies>
2. 自定义Sink
2.1 内置的Streaming Connector
Flink 内置了一些Streaming Connector,用于和第三方的系统交互。截至到当前为止,Flink支持以下Connector。括号中的source代表数据从这些第三方系统中流入Flink中,sink代表数据从Flink流到这些第三方系统中。
- Apache Kafka (sink/source)
- Elasticsearch (sink)
- Elasticsearch 2x (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (sink/source)
- Amazon Kinesis Streams (sink/source)
- Twitter Streaming API (source)
- Apache NiFi (sink/source)
- Apache Cassandra (sink)
- Redis (sink)
除此之外,Flink还允许我们自定义source和sink。本文所述例子是从Kafka中读取数据,并把数据写入数据库中;由于Flink已经内置了Kafka source,因此还需要自定义JDBC sink。
2.2 自定义JDBC sink
下面的代码就是一个JDBC sink的实现,其效果就是向PostgreSQL数据库中插入数据,具体请看代码中的注释说明。
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class PostgreSQLSink extends RichSinkFunction<Tuple3<String,String,String>> {
private static final long serialVersionUID = 1L;
private Connection connection;
private PreparedStatement preparedStatement;
/**
* open方法是初始化方法,会在invoke方法之前执行,执行一次。
*/
@Override
public void open(Configuration parameters) throws Exception {
// JDBC连接信息
String USERNAME = "postgres" ;
String PASSWORD = "********";
String DRIVERNAME = "org.postgresql.Driver";
String DBURL = "jdbc:postgresql://192.168.1.213/flink";
// 加载JDBC驱动
Class.forName(DRIVERNAME);
// 获取数据库连接
connection = DriverManager.getConnection(DBURL,USERNAME,PASSWORD);
String sql = "insert into kafka_message(
timeseq, thread, message) values (?,?,?)";
preparedStatement = connection.prepareStatement(sql);
super.open(parameters);
}
/**
* invoke()方法解析一个元组数据,并插入到数据库中。
* @param data 输入的数据
* @throws Exception
*/
@Override
public void invoke(Tuple3<String,String,String> data) throws Exception{
try {
String timeseq = data.getField(0);
String thread = data.getField(1);
String message = data.getField(2);
preparedStatement.setString(1,timeseq);
preparedStatement.setString(2,thread);
preparedStatement.setString(3,message);
preparedStatement.executeUpdate();
}catch (Exception e){
e.printStackTrace();
}
};
/**
* close()是tear down的方法,在销毁时执行,关闭连接。
*/
@Override
public void close() throws Exception {
if(preparedStatement != null){
preparedStatement.close();
}
if(connection != null){
connection.close();
}
super.close();
}
}
3. Flink Streaming Job 编程
3.1 Flink Stream编程的步骤
Flink job 编程基本上都是由一些基本部分组成:
- 获得一个 execution environment
- 加载/创建初始数据(Source)
- 指定在该数据上进行的转换(Transformations)
- 指定计算结果的存储地方(Sink)
- 启动程序执行。
3.2 Kafka-Flink-DB
下面的代码,是一个Flink Job,从Kafka中读取消息,并把消息写到关系型数据库中。
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class KafkaToDB {
public static void main(String[] args) throws Exception {
// 解析参数
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if (parameterTool.getNumberOfParameters() < 4) {
System.out.println("Missing parameters!");
System.out.println("\nUsage: Kafka --topic <topic> " +
"--bootstrap.servers <kafka brokers> "+
"--zookeeper.connect <zk quorum> --group.id <some id>");
return;
}
// 获取StreamExecutionEnvironment。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
// create a checkpoint every 5 secodns
env.enableCheckpointing(5000);
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(parameterTool);
// source
DataStream<String> sourceStream = env.addSource(
new FlinkKafkaConsumer08<String>(parameterTool.getRequired("topic"),
new SimpleStringSchema(), parameterTool.getProperties()));
// Transformation,这里仅仅是过滤了null。
DataStream<Tuple3<String, String, String>> messageStream = sourceStream
.map(new InputMap())
.filter(new NullFilter());
//sink
messageStream.addSink(new PostgreSQLSink());
env.execute("Write into PostgreSQL");
}
// 过滤Null数据。
public static class NullFilter implements FilterFunction<Tuple3<String, String, String>>{
@Override
public boolean filter(Tuple3<String, String, String> value) throws Exception {
return value != null;
}
}
// 对输入数据做map操作。
public static class InputMap implements MapFunction<String, Tuple3<String, String, String>> {
private static final long serialVersionUID = 1L;
@Override
public Tuple3<String, String, String> map(String line) throws Exception {
// normalize and split the line
String[] arr = line.toLowerCase().split(",");
if (arr.length > 2) {
return new Tuple3<>(arr[0], arr[1], arr[2]);
}
return null;
}
}
}
4. 把Job提交Flink集群
将上面的代码打包成jar后,通过下面的命令把job提交到Flink集群上。其中-c指定了flink-db.jar的Main class,其余的参数是本文job所用的kafka相关的参数。
bin/flink run -c com.bigknow.flink.KafkaToDB examples/flink-db.jar \
--topic my-topic \
--bootstrap.servers 192.168.1.170:9092 \
--zookeeper.connect 192.168.1.170:2181 \
--group.id test01`
(完)