环境:
系统:centos7
ps: 请确认kafka,zookeeper,storm部署完成(本文基于Apache ambari搭建的一个集群,进行测试)-
安装包:
$ yum install -y gcc python-devel java cyrus-sasl-devel cyrus-sasl-gssapi cyrus-sasl-md5 cyrus-sasl-plain librdkafka-devel redis
- Install lein
$ wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein
$ mv lein /usr/bin/
$ chmod a+x /usr/bin/lein
$ wget https://github.com/technomancy/leiningen/releases/download/2.8.1/leiningen-2.8.1-standalone.zip
$ mv leiningen-2.8.1-standalone.zip /root/.lein/self-installs/leiningen-2.8.1-standalone.jar
$ export LEIN_ROOT = 1
$ lein version # test lein version
- Create virtualenv
$ pip install streamparse confluent-kafka redis kazoo
整体架构
Start demo
-
get kafka brokers
-
find zookeeper cluster(through Ambari)
- get brokers
from kazoo.client import KazooClient import json def get_kafka_brokers(host): zookeeper = KazooClient(hosts=host, read_only=True) zookeeper.start() for node in zookeeper.get_children('/brokers/ids'): data, stats = zookeeper.get('/brokers/ids/'+node) props = json.loads(data) yield props['host']+':'+str(props['port']) zookeeper.stop() if __name__ == "__main__": print ','.join(get_kafka_brokers("cluster1.dc.com, cluster2.dc.com"))
输出: cluster2.dc.com:6667
通过Ambari 确认kafka集群,如图
-
-
producer往brokers生产数据(用了confluent-kafka)
# -*- coding:utf-8 -*- import confluent_kafka import random, time import json from get_broker_list import get_kafka_brokers def error_cb(err): print('Error: %s' % err) def main(): # bootstrap_servers = 'cluster2.dc.com:6667' zk_host = 'cluster1.dc.com,cluster2.dc.com' bootstrap_servers = ','.join(get_kafka_brokers(zk_host)) api_version_request = True conf = {'bootstrap.servers': bootstrap_servers, 'api.version.request': api_version_request, 'error_cb': error_cb, 'debug': 'protocol', 'broker.address.family': 'v4'} producer = confluent_kafka.Producer(**conf) user_list = ['jason', 'jane', 'tom', 'jack'] while True: data = {"user": random.choice(user_list), "timestamp": time.time(), "log_level": random.randint(0, 5) } try: producer.produce('test', value=json.dumps(data)) # time.sleep(random.randint(1, 2)) except BufferError: producer.poll(100) continue producer.flush() if __name__ == '__main__': main()
部分结果如图:
-
创建一个consumer进行验证:
#!/usr/bin/env python import time import json from confluent_kafka import Consumer, KafkaException, KafkaError from get_broker_list import get_kafka_brokers def main(): # broker = 'cluster2.dc.com:6667' zk_host = 'cluster1.dc.com,cluster2.dc.com' bootstrap_servers = ','.join(get_kafka_brokers(zk_host)) group = 'test.py' conf = {'bootstrap.servers': bootstrap_servers, 'group.id': group, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}} consumer = Consumer(**conf) consumer.subscribe(['test']) while True: msg = consumer.poll() try: print json.loads(msg.value()) except Exception: time.sleep(1) continue consumer.close() if __name__ == '__main__': main()
部分结果如图:
-
integrate with Storm(use package streamparse)
上面kafka producer产生了一条用户记录,storm demo以计算5分钟内产生了多少条记录(实际效果producer >> consumer,所以导致延迟问题,测试数据大概5分钟写入150w-180w条,资源限制导致的性能问题,仅供参考)
sparse quickstart onlineuser
其中topologies,bolts,以及spouts中的文件名可能是wordcount相关命名,修改或不修改均可,只需要确认topologies文件中的topology能与spouts中的spout,bolts中的bolt对应起来即可
-
vim spout/user.py
import sys, os # sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../../../kafka_example') abspath = "" # must fill with abs path, cannot use os.path.abspath, run as jar in /tmp directory if not abspath: raise Exception("setting kafka_exmaple directory abspath to import get_broker_list") sys.path.append(abspath) from confluent_kafka import Consumer from streamparse import Spout from get_broker_list import get_kafka_brokers class OnlineUserSpout(Spout): outputs = ['log'] def initialize(self, stormconf, context): # broker = 'cluster2.dc.com:6667' zk_host = 'cluster1.dc.com,cluster2.dc.com' broker = ','.join(get_kafka_brokers(zk_host)) group = 'test.py' conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}} self.consumer = Consumer(**conf) def activate(self): self.consumer.subscribe(['test']) def next_tuple(self): msg = self.consumer.poll() if msg.value(): self.emit([msg.value()]) def deactivate(self): self.consumer.close()
-
vim bolts/serializer_log.py
(这部分没有考虑用户重复问题)import json import time from datetime import datetime, timedelta from redis import StrictRedis from streamparse import Bolt class RedisLog(Bolt): def initialize(self, conf, ctx): self.redis = StrictRedis() self.interval_minute = 5 def _increment(self, duration): return self.redis.incr(duration) def process(self, tup): data = json.loads(tup.values[0]) user = data['user'] # useless timestamp = data["timestamp"] now = datetime.fromtimestamp(int(timestamp)) now = now - timedelta(minutes=now.minute % self.interval_minute, seconds=now.second, microseconds=now.microsecond) now_timestamp = int(time.mktime(now.timetuple())) duration = '{0}-{1}'.format(now_timestamp, now_timestamp + self.interval_minute * 60) count = self._increment(duration) self.emit([duration, count])
-
vim topologies/onlineuser.py
""" Online User topology """ from streamparse import Topology from bolts.serializer_log import RedisLog from spouts.user import OnlineUserSpout class OnlineUserCount(Topology): log_spout = OnlineUserSpout.spec() count_bolt = RedisLog.spec(inputs=[log_spout])
-
$ sparse run
# 必须在sparse quickstart 项目路径下(耗时较久,需要build成jar到/tmp下执行)
部分结果如图(可能有一些warn,这是由于zookeeper日记文件相关写入延迟,会影响storm性能,测试先忽略)
-
可以通过redis检测key value(key是以时间戳区间,整形,格式 'timestamp1-timestamp2')
import time from redis import StrictRedis redis = StrictRedis() while 1: keys = redis.keys() vals = redis.mget(keys) kv = zip(keys, vals) print kv time.sleep(10)
result:大致如图
-
可能出现的一些问题解决办法:
-
运行sparse run 时,爆storm版本不一致问题,修改project.clj,由于可能storm也是通过ambari进行安装,输出版本的格式不一致(Hortonworks data platform 版本号,类似‘1.1.0.2.6.2.0-205’,这时候需要去vim xxx/xxx/site-packages/streamparse/cli/run.py 大概48,49修改一下判断or去掉检测)
- 运行sparse run时,可能出现NoClassDefFoundError: org/apache/commons/lang/StringUtils.
解决的办法wget https://www.apache.org/dist/commons/lang/binaries/commons-lang-2.6-bin.zip.md5 unzip commons-lang-2.6-bin.zip cd commons-lang-2.6-bin.zip && mv commons-lang.jar storm/lib
-
Ending
整个过程中,可能还会出现一些issue,可以到对应的项目去查看文档。
Finally,本文原创,未经许可,谢绝转载。=_=!