当storm遇上python

storm是什么

他的官方文档是这样介绍的

Storm is a distributed realtime computation system.

关键词:分布式、实时、计算

你什么时候需要storm

当你有海量数据需要进行实时处理的时候,在这种场景下你往往需要利用到多台机器,而且让你关注的某一类数据按一定的规则路由到确切的节点,从而实现对信息流(往往需是有状态的)的连续计算。
实际上分布式计算就是一大堆节点(一般是在多台机器上)之间的互相通信,而storm管理了这些节点,定义了一个计算的模型(topology)让开发者可以忽略很多细节(比如集群管理、消息队列),从而把实现实时分布式计算任务简单化。

storm的哲学

storm的组件

Paste_Image.png
  • Nimbus: 分发代码,分发任务,监控错误
  • Zookeeper: 管理各个组件,保持系统稳定
  • Supervisor: 执行任务,往往多个组成一个拓扑(Topology)

storm的计算模型

Paste_Image.png
  • topology: 拓扑,实际上是一副图,代表了对某个计算过程的描述,他的组成部分有 Spout, Bolt, stream
  • Spout: 产生数据流,数据流的起点
  • Bolt: 接收数据流,执行计算或者重新转发出数据流
  • Stream: 数据流,即上图的箭头
  • Tuple: 数据流在计算模型中是由无数个tuple组成的
    所有的节点在这个拓扑中都是并发执行的。

storm的几种路由方式

路由(grouping)定义了stream如何在各个节点之中流动,下面只介绍几种常见方式,如下:
Shuffle grouping: 洗牌模式。随机平均地发配到下游节点上。
Fields grouping: 按照某一个字段来分配,拥有相同值的字段会分配到同一个节点上(即可连续跟踪某个固定特征的数据流)
Global grouping: 强制到某唯一的节点,实际上如果有多个节点去到任务号最低的节点。
all grouping: 强制到所有节点,需小心使用。
Partial Key grouping: 最新支持的,带负载均衡的Fields grouping。
Direct grouping: 手动指定要流动到的节点。

[关于storm的组成部分与计算哲学的更详细文档]

(http://storm.apache.org/documentation/Concepts.html)

hand on the code

假设你已经安装好了storm(请参照官方文档,或者其他一切所能参照的文档,而且请装0.9.2版本,下面会有说明)。
这时候一般的入门会让你开始你的第一个java程序来提交topology,但如标题所预示, 我们这里会使用python(对,只需要python)来进行示例。
首先我们需要一个开源项目的支持,它叫pyleus(yelp公司出品),这里有一个不幸的消息,它对storm的支持仅到0.9.2(最新版本0.9.4的支持正在开发中)。

第一次的提交

$ pip install pyleus
$ git clone https://github.com/Yelp/pyleus.git
$ pyleus build  pyleus/examples/exclamation_topology/pyleus_topology.yaml
$ pyleus local exclamation_topology.jar

只要以上简单几个操作,即可把这个topology提交到本地,如果没有任何错误我们就可以继续接下来的实际例子。

更有意义的例子-数单词

这个例子是pyleus项目自带的,examples目录下还有其他详细的例子可以参考。

这个Topology的目录树
word_count/
|-- word_count/
|   |-- __init__.py
|   |-- count_words.py
|   |-- line_spout.py
|   |-- log_results.py
|   |--split_words.py
|-- pyleus_topology.yaml
pyleus_topology.yaml

此文件定义了这个拓扑基本组成与数据流动

# An ultra-simple topology which shows off Storm and the pyleus.storm library

name: word_count # 自定义拓扑的名字

topology:

    - spout:
        name: line-spout # 自定义spout组件的名字
        module: word_count.line_spout # 代码是word_count文件夹下的line_spout.py

    - bolt:
        name: split-words # 自定义bolt组件的名字
        module: word_count.split_words # 代码是word_count文件夹下的split_words.py
        parallelism_hint: 3 # 并发的节点数
        groupings: 
            - shuffle_grouping: line-spout # 以洗牌模式接收来自line-spout组件的数据流

    - bolt:
        name: count-words # 自定义bolt的名字
        module: word_count.count_words # 代码是word_count文件夹下的count_words.py
        parallelism_hint: 3 # 并发的节点数
        groupings:
            - fields_grouping:
                component: split-words
                fields:
                    - word # 以filed grouping模式接收来自split-words组件的数据流,field字段为word。

    - bolt:
        name: log-results # 自定义bolt的名字
        module: word_count.log_results # 代码是word_count文件夹下的log_results.py文件
        groupings:
            - global_grouping: count-words # 以global grouping方式接收来自count-words组件的数据流

这里数据的流动可以描述为

line-spout > split-words > count-words > log-results

line_spout.py
import logging
import random

from pyleus.storm import Spout

log = logging.getLogger('counter')

LINES = """
Lorem ipsum dolor sit amet, consectetur
adipiscing elit. Curabitur pharetra ante eget
nunc blandit vestibulum. Curabitur tempus mi
a risus lacinia egestas. Nulla faucibus
elit vitae dignissim euismod. Fusce ac
elementum leo, ut elementum dui. Ut
consequat est magna, eu posuere mi
pulvinar eget. Integer adipiscing, quam vitae
pretium facilisis, mi ligula viverra sapien,
nec elementum lacus metus ac mi.
Morbi sodales diam non velit accumsan
mollis. Donec eleifend quam in metus
faucibus auctor. Cras auctor sapien non
mauris vehicula, vel aliquam libero luctus.
Sed eu lobortis sapien. Maecenas eu
fringilla enim. Ut in velit nec
lectus tincidunt varius. Sed vel dictum
nunc. Morbi mollis nunc augue, eget
sagittis libero laoreet id. Suspendisse lobortis
nibh mauris, non bibendum magna iaculis
sed. Mauris interdum massa ut sagittis
vestibulum. In ipsum lacus, faucibus eu
hendrerit at, egestas non nisi. Duis
erat mauris, aliquam in hendrerit eget,
aliquam vel nibh. Proin molestie porta
imperdiet. Interdum et malesuada fames ac
ante ipsum primis in faucibus. Praesent
vitae cursus leo, a congue justo.
Ut interdum tellus non odio adipiscing
malesuada. Mauris in ante nec erat
lobortis eleifend. Morbi condimentum interdum elit,
quis iaculis ante pharetra id. In
""".strip().split('\n')


class LineSpout(Spout):

    OUTPUT_FIELDS = ["line"] # 定义要输出的字段名与数量

    def next_tuple(self):
        line = random.choice(LINES)
        log.debug(line)
        # 这里tup_id是可选的
        # 如果你要让storm跟踪你的tuple流动的话需要加上
        # storm的可靠性保证需要这个
        # (line,) 这个tuple (刚好python的属于与storm的属于对上了)
        # 对应之前设置的OUTPUT_FIELDS
        self.emit((line,), tup_id=random.randrange(999999999))


if __name__ == '__main__':
    # 这里是无法通过print的方式从终端输出调试结果的
    # 所以这里采取的方式是写临时文件
    # 实际上如果多个节点同时写一个文件会存在竞争的情况
    # 不过这里仅供调试,所以暂时忽略这个问题
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_lines.log',
        format="%(message)s",
        filemode='a',
    )

    LineSpout().run()

这个spout的作用是把一个文本分拆成行,每行作为一个tuple输出给下游的bolt。
看下word_count_lines.log的内容:

$ head word_count_lines.log
lobortis eleifend. Morbi condimentum interdum elit,
erat mauris, aliquam in hendrerit eget,
vestibulum. In ipsum lacus, faucibus eu
quis iaculis ante pharetra id. In
Ut interdum tellus non odio adipiscing
nunc. Morbi mollis nunc augue, eget
elit vitae dignissim euismod. Fusce ac
nunc. Morbi mollis nunc augue, eget
lectus tincidunt varius. Sed vel dictum
lobortis eleifend. Morbi condimentum interdum elit,
split_words.py

import logging

from pyleus.storm import SimpleBolt

log = logging.getLogger('splitter')


class SplitWordsBolt(SimpleBolt):

    OUTPUT_FIELDS = ["word"] # 定义输出的字段只有一个,名为word

    def process_tuple(self, tup):
        line, = tup.values # 接收到上游的tuple
        log.debug(line)
        for word in line.split():
            log.debug(word)
            # 这里bolt用于跟踪tuple的参数是anchors
            # 并且需要把上游的tuple传入
            # 把word传给下游
            self.emit((word,), anchors=[tup]) 


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_split_words.log',
        format="%(message)s",
        filemode='a',
    )
    SplitWordsBolt().run()

这个bolt的作用是,对收到的line拆解成word单词,并传给下游
他的输出是 word_count_split_words.log

$head word_count_split_words.log
erat mauris, aliquam in hendrerit eget,
erat
lobortis eleifend. Morbi condimentum interdum elit,
lobortis
mauris,
vestibulum. In ipsum lacus, faucibus eu
aliquam
vestibulum.
in
eleifend.
count_words.py

count-words这个组件使用了field grouping
以field grouping模式接收来自split-words组件的数据流,field字段为word
所以,相同的word单词,会流动到同一个节点。

from collections import defaultdict
from collections import namedtuple
import logging

from pyleus.storm import SimpleBolt

log = logging.getLogger('counter')

Counter = namedtuple("Counter", "word count") # 输出是两个字段


class CountWordsBolt(SimpleBolt):

    OUTPUT_FIELDS = Counter # 输出是两个字段

    def initialize(self):
        # 在bolt启动的时候初始化
        # bolt是作为单例一直运行的
        self.words = defaultdict(int) 


    def process_tuple(self, tup):
        word, = tup.values # 获得上游的word
        self.words[word] += 1 # 计数
        log.debug("{0} {1}".format(word, self.words[word]))
        # 注意这里输出到下游的是两个字段 word 与 word的计数
        self.emit((word, self.words[word]), anchors=[tup])


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_count_words.log',
        format="%(message)s",
        filemode='a',
    )

    CountWordsBolt().run()

这个bolt的作用是对通用的单词进行计数,并且把两个字段:单词本身跟单词计数传递给下游
word_count_count_words.log的内容:

$ tail word_count_count_words.log
Ut 35894
laoreet 9472
Maecenas 8294
id. 19047
sapien, 11816
Suspendisse 9472
blandit 9599
Mauris 24100
erat 19047
a 16677
log_results.py

log-results组件用的是global grouping
在这种情况下所有来自上游的数据流都会到同一个节点,这样log写文件的话就不会有竞争问题了

import logging

from pyleus.storm import SimpleBolt

log = logging.getLogger('log_results')


class LogResultsBolt(SimpleBolt):

    def process_tuple(self, tup):
        word, count = tup.values # 从上游接收两个字段
        log.debug("%s: %d", word, count)


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_results.log',
        format="%(message)s",
        filemode='a',
    )

    LogResultsBolt().run()

这个bolt的作用只是落地,这里落地的方式是写log文件(一般生产场景会落地到数据库),落地文件内容:

tail word_count_results.log
faucibus: 28713
adipiscing,: 8526
molestie: 11949
at,: 9742
Maecenas: 8392
diam: 11934
eget.: 9714
quam: 17203
mauris,: 24189
tincidunt: 9662

提交此topology到远程

pyleus build examples/word_count/pyleus_topology.yaml
pyleus submit -n NIMBUS_HOST exclamation_topology.jar

这里NIMBUS_HOST为远程的nimbus地址

pyleus的详细文档

关于spout

可以看到上面数单词的例子实际上数据流的来源是一个测试程序随机产生的,在实际生产环境中,我们一般会采用kafka来作为数据产生的源头,
一个kafka的spout定义如下
(https://github.com/Yelp/pyleus/tree/develop/examples/kafka_spout)

# 这里的定义是没有任何操作的,可以通过为它增加bolt来实现功能
name: kafka_spout_example # 自定义topology名字

topology:

    - spout:
        name: kafka-my_topic # 自定义spout名字
        type: kafka # 制定类型为kafka
        options:
            # 配置kafka的topic
            topic: my_topic

            # 配置zookeeper地址,多个用逗号隔开
            zk_hosts: zookeeper1:2181,zookeeper2:2181

            # 配置给kafka存储consumer offsets 的ZooKeeper Root path
            # 默认为: /pyleus-kafka-offsets/<topology name>
            zk_root: /pyleus-kafka-offsets/kafka_spout_example

            # Kafka consumer ID.
            # 默认为: pyleus-<topology name>
            consumer_id: pyleus-kafka_spout_example

            # 需要从某个offset开始吗
            # 默认是false.
            from_start: false

            # 如果需要从某个offset开始则定义该offset
            start_offset_time: 1398971060
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容

  • Clojure实战(5):Storm实时计算框架 | Ji ZHANG's Bloghttp://shzhangj...
    葡萄喃喃呓语阅读 1,261评论 0 2
  • 原文链接Storm Tutorial 本人原创翻译,转载请注明出处 这个教程内容包含如何创建topologies及...
    quiterr阅读 1,603评论 0 6
  • 目录 场景假设 调优步骤和方法 Storm 的部分特性 Storm 并行度 Storm 消息机制 Storm UI...
    mtide阅读 17,022评论 30 60
  • 好多好多年不用文字记忆自己的生活了! 不记得自己从什么时候开始爱写写记记的,也许是从初中时吧!但却记得是从什么时候...
    a放下a阅读 272评论 1 0
  • 因为孩子的教育方式问题,又和孩子爸爸发生了分歧。我不禁想起《阿甘正传》和一句话:只有你能欣赏我。 很多家长,尤其是...
    嘉宝妈咪阅读 190评论 0 0