数据采集之Flume+Kafka

Flume简介

1. Flume特点

flume是收集日志的开源软件解决方案之一,相对于其他同类软件他具有高可用的,高可靠的,分布式等特性。flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source

2. Flume核心概念

  • Agent 使用JVM运行Flume 每台机器运行一个agent , 但是可以在一个agent中包含多个sources和sinks
  • Client 生产数据 , 运行在一个独立的线程
  • Source 从Client收集数据 , 传递给Channel
  • Sink 从Channel收集数据 , 运行在一个独立线程
  • Channel 连接 sources 和 sinks ,这个有点像一个队列
  • Events 可以是日志记录、 avro 对象等

Flume快速开发

1. 安装

  • yum 方式下载安装 :
[mis-ecif@hadoop10-4-0-226 ~]$ yum install flume 

解压文件,若打印如下信息,解压缩报错 ,可能是包没下载完全,重新下载重试即可

[mis-ecif@hadoop10-4-0-226 ~]$ tar -zxvf apache-flume-1.6.0-bin.tar.gz
gzip: stdin: unexpected end of file  
tar: Unexpected EOF in archive  
tar: Unexpected EOF in archive
tar: Error is not recoverable: exiting now

若解压成功,可检测安装是否成功:/usr/local/flume/bin/flume-ng version
打印以下信息,则表示安装成功了

[mis-ecif@hadoop10-4-0-226 ~]$ flume-ng version
Flume 1.6.0-transwarp-tdh480
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Fri Apr  7 07:52:45 UTC 2017
From source with checksum 4031fa0e0507f30090f954451ab3a164

若打印以下信息,可能是因为安装了hbase,将Hbase的hbase-env.sh文件中HBASE_CLASS注释掉即可

[mis-ecif@hadoop10-4-0-226 ~]$ flume-ng version
Could not find or load main class org.apache.flume.tools.GetJavaProperty #加载不了该类
Flume 1.6.0-transwarp-tdh480
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Fri Apr  7 07:52:45 UTC 2017
From source with checksum 4031fa0e0507f30090f954451ab3a164

2. 开发

  • 更改Flume配置文件
cd /usr/local/flume/conf/
cp flume-env.sh.template flume-env.sh
vi flume-env.sh # 修改flume-env.sh中JAVA_HOME变量的值
  • 创建Flume启动使用到的配置文件 exec_tail.conf
[root@hadoop10-1-0-144 conf]# vi /local/flume/conf/exec_tail.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3. 测试

  • 启动Flume
flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/spoon_kafka.conf -n a1 -Dflume.root.logger=INFO,console
  • 往Flume监控日志中添加数据
echo 'phoneinfo||223.104.7.66||OPPO R9sk||6.0.1||天津市||2017-07-25 06:53:23||中国移动||yingyongbao
' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

echo 'phoneinfo||101.38.64.172||iPhone 6 Plus||10.3.2||北京市||2017-07-25 07:11:40||中国联通' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

控制台若有数据打印,则表示测试成功

4. 更改配置,与kafka集成

  • 将消息传给 kafka
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flume_demo
#a1.sinks.k1.brokerList = 10.1.0.141:9092,10.1.0.142:9092,10.1.0.143:9092,10.1.0.144:9092
#Kafka集群Broker列表
a1.sinks.k1.brokerList = 10.1.0.144:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
  • 将消息缓存在本地文件系统中 --建议将消息缓存在本地文件系统
# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpoint = /mnt/disk1/flume/checkpoint #检查点文件存储路径
a1.channels.c1.dataDirs = /mnt/disk1/flume/data #消息数据存储路径
  • 创建kafka topic
./kafka-topics.sh --zookeeper 10.1.0.144:2181 --create --topic flume_demo --partition 3 --replication-factor 1
  • 查看topic
./kafka-topics.sh  --list --zookeeper 10.1.0.144:2181
  • 启动kafka consumer,接收flume消息
./kafka-console-consumer.sh --topic flume_demo  --bootstrap-server 10.1.0.144:9092
  • 重启Flume
flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/spoon_kafka.conf -n a1 -Dflume.root.logger=INFO,console
  • 往flume监控文件中添加日志
echo 'phoneinfo||116.227.248.47||HUAWEI MT7-CL00||6.0||||2017-07-25 07:21:36||中国移动||yingyongbao'  >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

查看kafka consumer窗口,若能够正常接收消息,则表示集成kafka成功。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容