Storm 是一个分布式的,可靠的,容错的数据流处理系统。下面我将分别从storm的整体架构以及部分原理进行讲解。
一、基本的概念
storm中服务器节点分为主节点和从节点,Nimbus为主节点和Supervisor为从节点。以及若干组件构成。下面为对一些术语进行简单的介绍:
Nimbus:主节点,是一个调度中心,负责分发任务
Supervisor:从节点,任务执行的地方
Worker:任务工作进程,一个Supervisor中可以有多个Worker。
Executor:Worker进程在执行任务时,会启动多个Executor线程
Topology:任务的抽象概念。由于storm是流式计算的框架,它的数据流和拓扑图很像,所以它的任务就叫topology。
Spout:从数据源获取数据并进行分发。
Bolt:得到Spout或者上一个Bolt的数据,然后进行处理后交给下一个Bolt处理。
Tuple:在storm中,一条数据可以理解为是一个Tuple。
二、storm的架构
任务提交处理流程
Nimbus是调度中心,Supervisor是任务执行的地方。Supervisor上面有若干个Worker,每个Worker都有自己的端口,Worker可以理解为一个进程。另外,每个Worker中还可以运行若干个线程。
当客户端向storm集群提交一个Topology时,这里的提交就是在集群上通过命令storm jar xxx
启动topology。如果我们是在Supervisor节点上执行storm jar xxx
,那么Supervisor会将jar包拷贝到Nimbus,之后Nimbus对Topology进行调度。
Nimbus会根据Topology所需要的Worker进行分配,将其分配到各个Supervisor的节点上执行。
现在假设我们我们有4个Supervisor节点,每个Supervisor都配置4个Worker。这是我们提交了一个Topology,需要4个Worker,那可能的分配情况可能如下图所示:
storm中的数据流
启动完Topology后,相关组件就开始运行起来了。在Storm中,Spout组件主要用来从数据源拉取数据,形成一个Tuple后转交给Bolt处理。Bolt接受到Tuple处理完后,可以选择继续交给下一个Bolt处理,也可以选择不往下传。这样数据以Tuple的形式一个接一个的往下执行,就形成了一个拓扑数据流。
storm数据在组件间的流向如下图所示:
三、Storm的并发度
在Storm中,Worker不是组件执行的最小单位。Executor才是,Executor可以理解为是一个线程。我们在创建topology的时候,可以设置执行spout的线程数和bolt的线程数。
假设spout和bolt的线程数加起来设置了8个,然后设置了2个worker,那么这8个线程可能就会随机分配到2个worker中,可能一个worker3个,一个worker5个。也有可能各自分配4个。如下图所示:
四、数据的Grouping策略
在实际应用中,Bolt组件的实例可能有多个,Tuple在流向Bolt时,选择哪个Bolt实例的策略就是grouping策略。
下面是Storm中的6种Grouping策略:
- Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同。轮询,平均分配。
- Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。
- All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。
- Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
- Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,不平均分配。
- Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来或者处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)
五、消息的可靠性保证 —— ack机制
一条数据在Spout中形成一个Tuple,然后交给一个个Bolt执行,那我们怎么保证这个Tuple被完整的执行了呢?这里的完整执行说的是这个Tuple必须在后面的每一个Bolt都成功处理,假设在一个Bolt中发生异常导致失败,这就不能算完整处理。
为了保证消息处理过程中的可靠性,storm使用了ack机制。storm会专门启动若干acker线程,来追踪tuple的处理过程。acker线程数量可以设置。
每一个Tuple在Spout中生成的时候,都会分配到一个64位的messageId。通过对messageId进行哈希我们可以执行要对哪个acker线程发送消息来通知它监听这个Tuple。
acker线程收到消息后,会将发出消息的Spout和那个messageId绑定起来。然后开始跟踪该tuple的处理流程。如果这个tuple全部都处理完,那么acker线程就会调用发起这个tuple的那个spout实例的ack()方法。如果超过一定时间这个tuple还没处理完,那么acker线程就会调用对应spout的fail()方法,通知spout消息处理失败。spout组件就可以重新发送这个tuple。
从上面的介绍我们知道了,tuple数据的流向会形成一个拓扑图,也可以理解成是一个tuple树。这个拓扑图的节点可能会有很多个,如果要把这些节点全部保存起来,处理大量的数据时势必会造成内存溢出。
对于这个难题,storm使用了一种非常巧妙的方法,使用20个字节就可以追踪一个tuple是否被完整的执行。这也是storm的一个突破性的技术。
ack机制的具体原理
我们都知道,自己异或自己,结果肯定为零( a ^ a = 0)。ack中就利用这个特性
- acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0, 然后每发射一个tuple/ack一个tuple,那么tuple的id都要跟这个校验值异或一下。注意,这里的tuple的id不是spout-tuple的id,和我们上面理解的messageId不是一个概念,要区分一下,是每个新生产的tuple的id,这个tupleId是随机生成的64位比特值
- 之后把得到的值更新为ack-val的新值。那么假设每个发射出去的tuple都被ack了, 那么最后ack-val一定是0(因为一个数字跟自己异或得到的值是0)。
举个例子,比如发射了某个tuple,就 ack-val ^ tupleId,然后ack了某个tuple,就再ack-val ^ tupleId,这样,ack-val 最终又变成了0,说明tuple已经全部处理成功了。
六、Storm的HA保证——高可用性保证
1. 数据方面的高可用
使用ack机制保证数据处理的高可用
2. Worker进程挂了怎么办?
Supervisor会自动重启worker线程。
3. Supervisor节点失效了怎么办?
可以在其他节点重启该supervisor任务。
4. Nimbus挂了怎么办?
在storm1.0之前,Nimbus是不支持HA的。Nimbus如果挂了,重启Nimbus进程就可以了,不会影响到现有topology的运行。
因为Nimbus只是一个调度中心,Nimbus和Supervisor的状态都保存在本地文件和ZooKeeper,因此他们进程可以随便杀死,然后重启,不会影响到Worker进程的运行。
另外,Nimbus的作用在就是在拓扑任务开始阶段,负责将任务提交到集群,后期负责拓扑任务的管理,比如任务查看,终止等操作。在通常情况下,nimbus的任务压力并不会很大,在自然情况下不会出现宕机的情况。
storm1.0后Nimbus的HA策略还没有具体研究过,有兴趣的小伙伴可自行前往官网查看文档。http://storm.apache.org/releases/1.2.1/nimbus-ha-design.html
七、总结
Storm的架构及原理整体理解起来不算很难,但很多细节还是需要在实践中才能发现。有兴趣的小伙伴可以去读读storm的源码,storm源码大多数都是用Clojure实现,对Clojure语言不熟悉的朋友可以去看一下JStorm的源码实现。这是阿里基于Storm用java实现的框架,据说更加稳定高效。
最后,哪里有说的不对的地方。敬请支出,感激不尽。