一、概念
单进程版与Storm版的流计算实现有许多相似的概念,其中最重要的包括:Topology、Spout、Bolt。Topology是一个由Spout节点和Bolt节点组成的有向无环图(或称有方向的树),一个轻应用可以有一个或多个Topology。Spout是数据的来源,它是Topology的根节点,每个Topology只有一个Spout。Bolt是真正负责处理业务逻辑的节点。
Storm将父节点传输给子节点的数据称为Tuple,单进程版的流计算与此对应的概念是BoltParameter。一组Tuple或BoltParameter组成Stream,Stream是一个抽象概念,没有具体的实现。
单进程版相当于Storm版的简化,有一些Storm版的概念这里不会有,比如:
- 分组(grouping)
- Worker
- Task
- Reliability
上面列出的概念基本都与Storm的并行处理有关,单进程版的流计算不存在并发问题,也就没有这些概念。
在编写Storm版的流计算程序时,很多非业务逻辑的功能是Storm负责维护和处理的,而单进程版的程序需要自己来处理,这引入了一些新的概念:
- SpoutProcessor,这个类负责维护Spout节点
- BoltProcessor,这个类负责维护Bolt节点
下文将详细解释SpoutProcessor和BoltProcessor。
二、Topology的实现
之前提到Topology树是由Spout和Bolt组成,实际上Spout和Bolt里边是业务逻辑相关的定义,真正让它们组成一棵树的是SpoutProcessor和BoltProcessor,这两个类都实现了ProcessNode接口,该接口定义如下:
public interface ProcessNode {
/**
* 提取本处理结点的名字
*/
String getName();
/**
* 提取本结点的Id,Id用来记录节点间的关系,全局唯一
*/
String getId();
/**
* 向本处理结点增加一个儿子结点
*/
void addchild(ProcessNode child);
/**
* 调用本处理结点的处理逻辑对数据进行处理
*/
void run(Object param);
}
SpoutProcesser的定义:
public class SpoutProcessor implements ProcessNode {
private static Logger logger = LoggerFactory.getLogger(SpoutProcessor.class);
/**
* 本拓扑待处理的数据队列
*/
KafkaDataQueue queue = new KafkaDataQueue();
/**
* 数据生成器
*/
private StreamSpout spout;
/**
* 本处理器对应的下一代处理器
*/
private LinkedList<ProcessNode> childrens;
//后面的代码省略
SpoutProcesser的第一个变量是KafkaDataQueue,这个数据队列由KafkaThread类负责写入数据(它的数据来源是Kafka队列),由ProcessThread负责读取数据并处理。关于ProcessThread等线程类后边详细介绍。
SpoutProcesser的第二个变量是StreamSpout,这就是Storm中Spout的对应实现,是拓扑树的根节点。
SpoutProcesser第三个变量:private LinkedList<ProcessNode> childrens;
,这个变量记录了它的下一级节点有哪些,这是组成Topology树的关键。
public class BoltProcessor implements ProcessNode {
/**
* 本处理器对应的Bolt
*/
private Bolt bolt;
/**
* 本处理器对应的下一代处理器
*/
private LinkedList<ProcessNode> childrens;
//后面的代码省略
BoltProcessor也是一样,维护了Bolt的下一级节点列表。
三、线程
单进程版流计算有四个线程:
- KafkaThread,如前所述,它负责读取Kafka队列,并把数据放到SpoutProcesser的KafkaDataQueue。每个SpoutProcesser都会收到一份完全相同的数据的拷贝,有点类似于Storm的AllGrouping分组方式。全局只有一个KafkaThread。
- ProcessThread,它通过调用SpoutProcesser的实例来处理KafkaDataQueue中的数据,每个Topology对应一个ProcessThread。
- OutputThread,负责把流计算的结果存储到MongoDB,全局只有一个OutputThread。
- ShutdownHookThread,当进程退出时,如kill -15 程序退出时,把未处理的kafka数据退回kafka队列,把已经处理生成的结果存入mongo,减少数据丢失。ShutdownListener类实现了ApplicationListener接口,当监听到ContextClosedEvent事件时启动ShutdownHookThread。
四、流计算业务逻辑的实现
关于Groovy脚本是如何在Java程序中运行的,可以参考《Groovy脚本使用方法》、《baas系统脚本说明》。这篇文档主要介绍StreamSpout、ConvertBolt和StatBolt的实现(AlarmBolt与ConvertBolt类似,不再重复介绍)。
(一)StreamSpout
StreamSpout是根节点的实现,它实现了Spout接口:
public interface Spout {
/**
* 得到本Spout的名字
*/
String getName();
/**
* 得到本Spout的Id
*/
String getId();
/**
* 准备本Bolt
*/
void prepare();
/**
* 提取设备档案操作对象
*/
IArchives getArchives();
/**
* 设置设备档案操作对象
*/
void setArchives(IArchives archives);
/**
* 执行Spout内部的判断逻辑,判别是否应该交由本Spout进行处理
*/
BoltParameter execute(SpoutParameter spoutParameter);
}
prepare方法只在初始化的时候执行一次,它负责做一些准备工作。
execute方法每收到一个数据就会运行一次,处理真正的业务逻辑,数据通过参数BoltParameter传递进来,通过返回BoltParameter传递给下一级节点。
getArchives方法返回一个IArchives接口,通过这个接口提供的方法可以获取设备档案。
StreamSpout的定义(核心片段,非完整代码):
public class StreamSpout implements Spout {
/**
* 本Spout的配置
*/
private SpoutConfig config;
/**
* 访问设备档案的对象
*/
private Archives archives;
@Override
public void prepare() {
}
@Override
public BoltParameter execute(SpoutParameter spoutParameter) {
//省略具体实现
}
//省略后面的代码
第一个成员变量是SpoutConfig,这就是用户在轻应用平台上所做的配置,由单进程流计算的入口类Executor从数据库中读取填充。
第二个成员变量Archives,这个对象包含一个deviceId成员变量,当StreamSpout收到数据时,execute方法会给deviceId赋值,在后面的节点中将用来获取档案信息。
StreamSpout的prepare方法目前为空,没有任何准备工作要做。
execute方法首先判断数据流名称是否和用户配置的一致,然后构造BoltParameter对象(由流计算的上下文、内置输入对象、档案操作对象组成),返回该对象给下一级节点。
(二)ConvertBolt
ConvertBolt的定义(核心片段,非完整代码):
public class ConvertBolt extends BaseBolt {
/**
* 本Bolt的配置
*/
private ConvertBoltConfig config;
/**
* 脚本对象
*/
private IConvertProcess process;
/**
* 把数据保存到Mongo的队列
*/
private OutputQueue outputQueue;
//省略非业务逻辑私有变量
@Override
public void prepare() {
//省略具体实现
}
@Override
public List<BoltParameter> execute(BoltParameter parameter) {
//省略具体实现
}
}
ConvertBolt继承了BaseBolt,后者非常简单,不影响整体理解,细节请阅读源代码。
第一个成员变量是ConvertBoltConfig,和SpoutConfig一样,这是用户在轻应用平台上所做的配置,由单进程流计算的入口类Executor从数据库中读取填充。
第二个成员变量IConvertProcess,用来引用Groovy脚本的实例,执行Groovy脚本的时候用到。
第三个成员变量OutputQueue,OutputThread会将这个队列的数据存储到MongoDB。
prepare方法只在初始化的时候执行一次,它负责做一些准备工作,例如解析ConvertBoltConfig并加载Groovy脚本。
execute方法每收到一个数据就会运行一次,处理真正的业务逻辑,数据通过参数BoltParameter传递进来,通过返回List<BoltParameter>传递给下一级节点。
(三)StatBolt
StatBolt的定义(核心片段,非完整代码):
public class StatBolt extends BaseBolt {
/**
* 统计单元的配置
*/
private StaticsBoltConfig config;
/**
* 统计脚本对象
*/
private IStatProcess statProcessor;
/**
* 统计缓存对象
*/
private Caches caches;
/**
* 统计过程中访问Redis的对象,用于保存和提取中间结果
*/
private RedisClient redisClient;
/**
* 把数据存储到Mongo中的队列
*/
private OutputQueue outputQueue;
@Override
public void prepare() {
//省略具体实现
}
@Override
public List<BoltParameter> execute(BoltParameter parameter) {
//省略具体实现
}
}
StatBolt和ConvertBolt结构基本一致,相同的部分不再重复说明。
成员变量Caches是内置的统计计算所需的缓存类,在prepare方法中初始化,它实现了ICaches接口,包括单个设备统计用到的group函数和全局统计用到的group(Object group)。虽然名为Caches,但它更多的是作为计算所需的内置对象,实际的缓存功能是由它内部的RedisClient类实现。
execute方法判断是否到达输出时间(上一次输出时间可通过Caches获取),如果到达则执行输出脚本,如果没有到达则执行计算脚本。
五、缓存的实现
采用了两级缓存机制:Redis和Ehcache,前者是远程缓存,后者是本地缓存。事实上正是由于远程缓存性能不够好才引入了本地缓存,但另一方面,如果只使用本地缓存,程序意外终止时会丢失数据,所以两者结合使用。
CacheUtils类提供了Ehcache的访问,RedisClient类除了提供了Redis的访问,还包含CacheUtils的调用并提供其他类所需的接口方法。
缓存的细节信息见下表:
缓存类型 | 最大数量级 | key值构成 | Ehcache缓存名字 | 备注 |
---|---|---|---|---|
设备档案 | 设备数量 | arch-、archiveId、deviceId | archiveCache | |
单个统计中间结果 | 设备数量*统计节点数量 | cache-、statId、deviceId | midStatCache | 单个统计时缓存数量远大于全局统计 |
单个统计的上一次输出时间 | 设备数量*统计节点数量 | stat_last_time-、statId、deviceId | lastStatOutputCache | 单个统计时缓存数量远大于全局统计 |
全局统计中间结果 | 档案字段数量*统计节点数量 | cache-、statId、group | midStatCache | group是档案字段的值 |
全局统计的上一次输出时间 | 档案字段数量*统计节点数量 | stat_last_time-、statId、group | lastStatOutputCache | group是档案字段的值 |
上一次告警输出的时间(数据来源为非统计节点) | 设备数量*告警节点数量 | alarm_last-、alarmId、deviceId | lastAlarmCache | |
上一次告警输出的时间(数据来源为单个统计节点) | 设备数量*告警节点数量 | alarm_last-、alarmId、deviceId | lastAlarmCache | key值看起来和上一行相同,但alarmId实际会不一样。 |
上一次告警输出的时间(数据来源为全局统计节点) | 档案字段数量*告警节点数量 | alarm_last-、alarmId、group | lastAlarmCache | group是档案字段的值 |
全局统计的维度值列表 | 档案字段数量*统计节点数量 | statId | statDimensionsCache | value是Set<String>,Set里边放的group;单线程版本的流计算是存储在Redis |
六、程序的入口:Executor类
Executor类的init方法是单进程版流计算程序的入口,init方法主要做了两件事:创建Topology树和启动各个线程,当init方法执行完毕之后,单进程流计算程序就开始读取Kafka队列中的数据并处理,这个过程将会一直运行下去,只能用户手动停止。