今天花了半天时间抽空看了一下flume实时接入,结合数据仓库中有部分报表有着准实时刷新的需求,需要抽数阶段近乎实时,为后面统计计算节省时间。虽然flume接入关系型数据库数据并不太合适,比如源系统删除、更新数据,flume无法处理,但是对于日志接入这种只有插入的场景还是比较合适的。
下面介绍下flume:
一、Flume的概念
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力
二、Flume的处理流程
Flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,Flume再删除自己缓存的数据。
在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?Event将传输的数据进行封装,是Flume传输数据的基本单位,如果是文本文件,通常是一行记录。Event也是事务的基本单位。Event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。Event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。
三、Flume的架构介绍
Flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent。Agent本身是一个Java进程,运行在日志收集节点——所谓日志收集节点就是服务器节点。 Agent里面包含3个核心的组件:source、channel和sink,类似生产者、仓库、消费者的架构。
- Source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
- Channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
- Sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。
四、Flume的运行机制
Flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据输入的source,一个是数据输出的sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方,例如HDFS等。注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。
五、Flume的安装与配置
- 下载 flume wget http://mirrors.shuosc.org/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
- 解压 tar -xf apache-flume-1.8.0-bin.tar.gz
- 如果需要连接数据库 需要下载 flume-ng-sql-source-1.4.3.jar 以及mysql 驱动jar
- vim flume.conf
下面配置为接入mysql
gent.sinks = HDFS
agent.sources = sql-source
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
#mysql conn
agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://xx.xx.xx.xx:3306/data_dev
agent.sources.sql-source.hibernate.connection.user = root
agent.sources.sql-source.hibernate.connection.password = xxxxxxx
agent.sources.sql-source.hibernate.connection.autocommit = true
agent.sources.sql-source.table = src_table_detail
agent.sources.sql-source.start.from = 0
agent.sources.sql-source.custom.query = select * from src_table_detail where id > $@$ order by id
agent.sources.sql-source.batch.size = 100
agent.sources.sql-source.max.rows = 100
agent.sources.sql-source.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent.sources.sql-source.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent.sources.sql-source.run.query.delay=5000
# Status file is used to save last readed row
agent.sources.sql-source.status.file.path = /home/xxxx/log
agent.sources.sql-source.status.file.name = sql-source.sqlSource.status
agent.sources.sql-source.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.sql-source.hibernate.c3p0.min_size=1
agent.sources.sql-source.hibernate.c3p0.max_size=10
#hdfs
agent.sinks.HDFS.channel = ch1
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.path = hdfs://nbd-hdfs/user/hive/warehouse/xxx.db
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.rollSize = 268435456
agent.sinks.HDFS.hdfs.rollInterval = 0
agent.sinks.HDFS.hdfs.rollCount = 0
启动flume bin/flume-ng agent -c conf/ -f conf/flume.conf -n agent --no-reload-conf -Dflume.root.logger=INFO,console
mysql新增的数据会实时进入hdfs