storm_定时机制tick使用

<h4>背景:</h4>
我们知道在java中可以有最少3钟方式来实现定时任务:1、普通thread(里面使用while循环以及sleep) 2、Timer和TimerTask 3、ScheduledExecutorService ,另外还有功能更全的Quartz框架或者是spring集成的Quartz。当然从标题就知道我们今天不是讲这些东西,而是讲讲storm中自带的定时功能使用,可以使用场景如:每分钟统计订单数据累计数据总和等。当然这其中最好的搭配就是使用kafka来做订单消息推送,目前我们只讲个本地main demo。
<h4>一、tick全解</h4>
<b>1、tick的功能</b>
Apache Storm中内置了一种定时机制——tick,它能够让任何bolt的所有task每隔一段时间(精确到秒级,用户可以自定义)收到一个来自systemd的tick stream的tick tuple,bolt收到这样的tuple后可以根据业务需求完成相应的处理。Tick功能从Apache Storm 0.8.0版本开始支持,本文在Apache Storm 0.9.5上测试。
<b>2、为bolt设置tick</b>
若希望某个bolt每隔一段时间做一些操作,那么可以将bolt继承BaseBasicBolt/BaseRichBolt,并重写getComponentConfiguration()方法。在方法中设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,单位是秒。
getComponentConfiguration()是backtype.storm.topology.IComponent接口中定义的方法,在此方法的实现中可以定义以Topology开头的此bolt特定的Config。
<pre>


</pre>
这样设置之后,此bolt的所有task都会每隔一段时间收到一个来自systemd的tick stream的tick tuple,因此execute()方法可以实现如下:
<pre>

</pre>
<b>3、全局tick</b>
若希望Topology中的每个bolt都每隔一段时间做一些操作,那么可以定义一个Topology全局的tick,同样是设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:
<pre>
</pre>
<b>当我们在整个Topology上设置tick和我们单个运算bolt上冲突时,其优先级如何呢?事实是在更小范围的bolt设置的tick优先级更高</b>
<b>4、定时精度问题</b>
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精确到秒级的。例如某bolt设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS为10s,理论上说bolt的每个task应该每个10s收到一个tick tuple。实际测试发现,这个时间间隔的精确性是很高的,一般延迟(而不是提前)时间在1-2ms左右。
<h4>二、代码实现</h4>
1、spout代码
<pre>
public class TickWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {"a","b","c"};
private int index = 0;
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index ++;
if(index >= sentences.length){
index = 0;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
@SuppressWarnings("rawtypes")
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
</pre>
2、bolt代码
<pre>
public class TickWordCountBolt extends BaseBasicBolt{
Map<String, Integer> counts = new ConcurrentHashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
System.err.println("TickWordCount bolt: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
//模拟聚合打印结果
for (String key : counts.keySet()) {
System.err.println("key: " + key + " count: " + counts.get(key));
}
//模拟10秒钟的结果处理以后清空操作
counts.clear();
} else {
String result = tuple.getStringByField("word");
if(counts.get(result) == null){
counts.put(result, 1);
}else{
counts.put(result, counts.get(result) + 1);
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
//设置10秒发送一次tick心跳
@SuppressWarnings("static-access")
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
}
</pre>
3、main调试代码
<pre>
public class TickTest {
@SuppressWarnings("static-access")
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TickWordSpout());
//启动3个线程按word值进行分组处理
builder.setBolt("count", new TickWordCountBolt(),3).fieldsGrouping("spout", new Fields("word"));
Config conf = new Config();
//设置一个全局的Topology发送tick心跳时间,测试优先级
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
}
}
}
---------------------------------------输出结果------------------------------------------------
TickWordCount bolt: 2016-09-17 12:41:23:031
key: b count: 3014
TickWordCount bolt: 2016-09-17 12:41:23:041
key: c count: 3017
TickWordCount bolt: 2016-09-17 12:41:23:053
key: a count: 3021
</br>
TickWordCount bolt: 2016-09-17 12:41:33:031
key: b count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:053
key: a count: 3295
</br>
TickWordCount bolt: 2016-09-17 12:41:43:031
key: b count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:053
key: a count: 3293
</br>
TickWordCount bolt: 2016-09-17 12:41:53:031
key: b count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:041
key: c count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:053
key: a count: 3298
</br>
TickWordCount bolt: 2016-09-17 12:42:03:031
key: b count: 3293
TickWordCount bolt: 2016-09-17 12:42:03:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:42:03:053
key: a count: 3293
</pre>
从这组测试数据来看,每组都是相隔10s执行0延迟,不过在测试中也有发现延迟1-2ms的情况,还是比较精准的。
<h4>三、tick实现代码浅显分析</h4>
TopologyBuilder.setBolt
<pre>
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
</pre>
<pre>
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}
</pre>
<pre>
<b> //Map conf = component.getComponentConfiguration();能够获取设置的tick发送心跳的设置</b>
private void initCommon(String id, IComponent component, Number parallelism) {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue());
Map conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}
</pre>
tick功能的使用就讲到这里啦。。。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,393评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,790评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,391评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,703评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,613评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,003评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,507评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,158评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,300评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,256评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,274评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,984评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,569评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,662评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,899评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,268评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,840评论 2 339

推荐阅读更多精彩内容