Spring XD构建大数据实时计算环境

Spring XD(eXtreme Data,极限数据)是Pivotal的大数据产品。它结合了Spring BootGrails,组成Spring IO平台的执行部分。尽管Spring XD利用了大量现存的Spring项目,但它是一种运行时环境,而不是一个类库或者框架,它包含带有服务器的bin目录,你可以通过命令行启动并与之交互。Spring XD 是一个统一的,分布式,可扩展的系统用于数据摄取,实时分析,批量处理和数据导出。该项目的目标是简化大数据应用的开发。

spring-xd

安装spring-xd

对于OSX用户,如果还没有Homebrew的话,请安装,然后运行:

brew tap pivotal/tap
brew install springxd

spring-xd会安装到 /usr/local/Cellar/springxd/1.0.0.M7/libexec (依赖于Spring XD的库)。

注意:如果你随后想要安装更新的版本,那么使用brew upgrade springXD就可以。

红帽或者CentOS的用户可以使用Yum来安装。

Windows用户可以下载最新的.zip文件,解压,安装到文件夹,然后把XD_HOME这个环境变量设置成安装文件夹。

使用说明

最新的官方文档:

http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/reference/html/

MAC上使用spring-xd的实践

xd-singlenode

xd-shell

就可以进入xd的界面,如果失败,可以参考下面的解决方式:

if the server could not be reached, the prompt will read

server-unknown:>

You can then use the admin config server <url> to attempt to reconnect to the admin REST endpoint once you’ve figured out what went wrong:

admin config server http://localhost:9393


如果本地在配置kafka的时候已经启动了zookeeper,所以这里需要把zookeeper的地址修改为实际的地址。修改下面的文件即可:

xd/config/servers.yml:

Zookeeper properties

client connect string: host1:port1,host2:port2,...,hostN:portN

zk:
client:
connect: localhost:2181


## 1. kafka作为Sink

xd:>stream create push-to-kafka --definition "http | kafka --topic=myTopic" --deploy
xd:>Created and deployed new stream 'push-to-kafka'
xd:>http post --data "push-messages"

POST (text/plain;Charset=UTF-8) http://localhost:9000 push-messages
200 OK


## 2. kafka作为Source

Configure a stream that has kafka source with a single topic:

    xd:> stream create myKafkaSource1 --definition "kafka --zkconnect=localhost:2181 --topic=mytopic | log" --deploy
Configure a stream that has kafka source with a multiple topics:

    xd:> stream create myKafkaSource2 --definition "kafka --zkconnect=localhost:2181 --topics=mytopic1,mytopic2 | log" --deploy

有问题,目前不能正常看到输出的消息,log没有能看到producer的消息,不知道是没有收到,还是不能正确地解析!

    2016-03-30T14:36:28+0800 1.3.1.RELEASE INFO pool-23-thread-1 sink.kafkaStream - [B@d378b3
    2016-03-30T14:36:28+0800 1.3.1.RELEASE INFO pool-23-thread-1 sink.kafkaStream - [B@f23f132
    2016-03-30T14:36:28+0800 1.3.1.RELEASE INFO pool-23-thread-1 sink.kafkaStream - [B@1272c626
    2016-03-30T14:36:28+0800 1.3.1.RELEASE INFO pool-23-thread-1 sink.kafkaStream - [B@dab4d9

调整成输出到file,命令如下:

    xd:>stream create goo --definition "kafka --zkconnect=10.160.5.56:2181 --topic=test | file" --deploy
    Created and deployed new stream 'goo'

在/tmp/xd/output/goo.out里面可以看到下面的内容:

50,58,116,101,115,116,121,111,117
50,58,116,101,115,116,121,111,117
51,58,116,101,115,116,121,111,117
52,58,116,101,115,116,121,111,117
52,58,116,101,115,116,121,111,117

116,101,115,116
116,101,115,116,121,111,117,32,50

27,91,65,27,91,65,27,91,66,27,91,66

116,101,115,116,104,116,116,112,107,97,102,107,97
55,58,116,101,115,116,121,111,117

正是kafka topic=test中的内容,不过这里不是字符串的形式,而是用ascii码来表示了,看来确实kafka作为source是可行的,只是输出的内容格式需要关注。


## 3. cassandra作为sink
spring xd中已经把cassandra单独作为一个sink的module,所以可以不用jdbc的方式来搞。下面创建和发送数据的过程如下:

xd:>stream create castest --definition "http | cassandra --ingestQuery='insert into users(user_id, fname, lname) values(?,?,?)' --keyspace=mykeyspace --contactPoints=10.160.5.56" --deploy
Created and deployed new stream 'castest'
xd:>
xd:>http post http://localhost:9000 --data "{"user_id":1800,"fname":"bob", "lname":"david"}"

POST (text/plain;Charset=UTF-8) http://localhost:9000 {"user_id":1800,"fname":"bob", "lname":"david"}
200 OK

可以在cassandra的client里面看到最终的结果,可以看到我们插入的数据内容:

cqlsh:mykeyspace> select * from users ;

user_id | fname | lname
---------+-------+-------
1745 | john | smith
1746 | Bob | smith
1800 | bob | david


**注意**

***每个http source都要占用一个web 端口,默认是9000。多个流的话要为不同的流绑定不同的web 端口***

## 4. kafka作为source, spark-stream作为processor, cassandra作为sink
从上面的过程中可以看出,单独的过程都已经可以打通,将数据格式做一些转换,正好可以实验一下processor的工作方式。我们通过一个processor来把kafka的输入转换之后输出到cassandra.

因为kafka作为source的时候输出的不是字符串,是二进制数据,所以不能简单地用shell, **shell要求输入和输出都是String**。

这里我们自定义了两个module作为processor,其中message-transformer用来将kafka consumer接收到的byte数据转换成字符串。find-list是一个spark streaming,用来对message-transformer转换过之后的条目进行过滤,通过过滤的数据作为cassandra的输入,将内容存储到cassandra.

在spring-xd 中proceessor, source,sink都时一个module,系统自带了一些module,也可以根据需求实现module,一个module就是一个jar包,可以参考spring xd的文档和下面的github工程来实现自定义的module.

    https://github.com/spring-projects/spring-xd-samples

将实现好的module的jar包,通过module update命令加载到spring xd中,后续就可以使用了。通过module list命令可以查看目前spring xd环境中已有的module 列表。

xd:>module list
Source Processor Sink Job


  file                aggregator               aggregate-counter        filejdbc
  ftp                 bridge                   cassandra                filepollhdfs
  gemfire             filter                   counter                  ftphdfs
  gemfire-cq          find-list                field-value-counter      gpload
  http                header-enricher          file                     hdfsjdbc
  jdbc                http-client              ftp                      hdfsmongodb
  jms                 json-to-tuple            gauge                    jdbchdfs
  kafka               message-transformer      gemfire-json-server      sparkapp
  mail                object-to-json           gemfire-server           sqoop
  mongodb             script                   gpfdist                  timestampfile
  mqtt                scripts                  hdfs
  rabbit              shell                    hdfs-dataset
  reactor-ip          splitter                 jdbc
  reactor-syslog      transform                kafka
  sftp                tweet-test               log
  syslog-tcp                                   mail
  syslog-udp                                   mongodb
  tail                                         mqtt
  tcp                                          null
  tcp-client                                   rabbit
  time                                         redis
  trigger                                      rich-gauge
  twittersearch                                router
  twitterstream                                shell
                                               splunk
                                               tcp
                                               throughput-sampler

然后我们再来创建一个流来完成整个操作:

xd:>stream create test --definition "kafka --zkconnect=10.160.5.56:2181 --topic=test | message-transformer | find-list | cassandra --ingestQuery='insert into journey(name, date, type, credentials, credentials_no, contact, flight, depart, dest, seat, airport, carriage, station) values(?,?,?,?,?,?,?,?,?,?,?,?,?)' --keyspace=mykeyspace --contactPoints=10.160.5.56" --deploy
Created and deployed new stream 'test'


如果我们通过一个kafka的producer生产指定格式的数据,且topic为test,通过spring xd的stream可以将数据最终写入到cassandra中。最后通过cqlsh查看可以看出只有find-list processor过滤成功的数据被写入到数据库中。

cqlsh:mykeyspace> select * from journey;
name | date | type | airport | carriage | contact | credentials | credentials_no | depart | dest | flight | seat | station
------+----------+-------+----------+----------+---------+-------------+----------------+----------+------+--------+------+---------
lisi | 20160404 | plane | 首都机场 | null | 888888 | 身份证 | 1234567 | hangzhou | null | CA1986 | 15F | null
lisi | 20160412 | plane | 首都机场 | null | 888888 | 身份证 | 1234567 | hangzhou | null | CA1986 | 15F | null

(2 rows)



## 总结

以上只是较为详细地列举了spring xd环境搭建和使用的过程,事实上我们在github上创建了一个项目来完成更有应用价值的工作。从上面的过程中,可以看出,spring xd通过stream可以将不同的过程很好地连接了起来。这样应用开发人员就可以付出更少的精力来进行环境搭建和数据处理,而是更多地关注应用的逻辑。

上面processor虽然只做了对提交数据进行过滤这一个业务,但是事实上spark streaming作为processor可以通过spark mlib来支持更多更复杂的数据处理操作。这里不再发散,有兴趣的可以自己去探索。

另外:spring xd安装包本身有130多M,会比较大。在一些实际的线上环境中可能并不是很实用,如果要实现一款真正在线上运行的产品,建议可以实用spring cloud data flow.
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容