问题描述
SparkStreaming的WordCount
Centos下安装nc命令工具
netcat(nc)是一个简单而有用的工具,被誉为网络安全界的“瑞士军刀”,不仅可以通过使用TCP或UDP协议的网络连接读写数据,同时还是一个功能强大的网络调试和探测工具,能够建立你需要的几乎所有类型的网络连接。
在Linux终端窗口可以直接使用yum工具进行安装:
[root@master01 spark]# yum install nc.x86_64
安装完毕后,在终端模式下运行nc -help查看命令是否正常安装
[hadoop@master01 spark]$ nc -help
usage: nc [-46DdhklnrStUuvzC] [-i interval] [-p source_port]
[-s source_ip_address] [-T ToS] [-w timeout] [-X proxy_version]
[-x proxy_address[:port]] [hostname] [port[s]]
Command Summary:
-4 Use IPv4
-6 Use IPv6
-D Enable the debug socket option
-d Detach from stdin
-h This help text
-i secs Delay interval for lines sent, ports scanned
-k Keep inbound sockets open for multiple connects
-l Listen mode, for inbound connects
-n Suppress name/port resolutions
-p port Specify local port for remote connects
-r Randomize remote ports
-S Enable the TCP MD5 signature option
-s addr Local source address
-T ToS Set IP Type of Service
-C Send CRLF as line-ending
-t Answer TELNET negotiation
-U Use UNIX domain socket
-u UDP mode
-v Verbose
-w secs Timeout for connects and final net reads
-X proto Proxy protocol: "4", "5" (SOCKS) or "connect"
-x addr[:port] Specify proxy address and port
-z Zero-I/O mode [used for scanning]
Port numbers can be individual or ranges: lo-hi [inclusive]
OK,安装正常。下面我们就可以使用nc -l 9999 来监听端口(linux主机ip为192.168.47.141),并发送数据;
[hadoop@master01 spark]$ nc -l 9999
Hello world!
How are you?
Goodbye!
sparkstreaming wordcount代码
程序启动后,连接远程linux主机(192.168.47.141)的9999端口,消费那台linux主机nc命令后输入的单词,进行统计。
package com.hx.test
/**
* fileName : Test11StreamingWordCount
* Created by 970655147 on 2016-02-12 13:21.
*/
object Test11StreamingWordCount {
// 基于sparkStreaming的wordCount
// 环境windows7 + spark1.2 + jdk1.7 + scala2.10.4
// 1\. 启动netcat [nc -l -p 9999]
// 2\. 启动当前程序
// 3\. netcat命令行中输入数据
// 4\. 回到console, 查看结果[10s 之内]
// *******************************************
// 每一个print() 打印一次
// -------------------------------------------
// Time: 1455278620000 ms
// -------------------------------------------
// Another Infomation !
// *******************************************
// inputText : sdf sdf lkj lkj lkj lkj
// MappedRDD[23] at count at Test11StreamingWordCount.scala:39
// 2
// (sdf,2), (lkj,4)
def main(args :Array[String]) = {
// Create a StreamingContext with a local master
// Spark Streaming needs at least two working thread
val sc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10) )
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = sc.socketTextStream("192.168.47.141", 9999)
// Split each line into words
// 以空格把收到的每一行数据分割成单词
val words = lines.flatMap(_.split(" "))
// 在本批次内计单词的数目
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
// 打印每个RDD中的前10个元素到控制台
wordCounts.print()
sc.start()
sc.awaitTermination()
}
}