Spark Streaming学习六七八章笔记

通过词频统计功能学习Spark-submit的使用:

先打开一个命令窗口输入nc -lk 9999


然后在另一个窗口,spark的bin文件夹下输入

./spark-submit --master local[2] \

--class org.apache.spark.examples.streaming.NetworkWordCount \

--name NetworkWordCount \

/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar hadoop000 9999

在netcat窗口输入a a a a b b之后再spark窗口的流式输出会见到词频统计的结果。


sparkStreaming工作原理(粗粒度)

Spark Streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine处理。


sparkStreaming工作原理(细粒度)

细粒度工作原理

首先,spark应用程序运行在driver端,driver需要在Executor(电脑)中启动Receiver接收器,接收数据流,并且分模块接收,可能还会以副本的方式存储,接收了一个周期之后,Executor会向spark应用程序返回接收情况(分块数量,副本数量等等)应用程序会将任务分发到Executor中。

DStream概念:对DStream进行操作,比如map/flatMap,其实底层会被翻译为对DStream中的每个RDD都做相同的操作,因为一个DStream是由不同批次的RDD所构成的。

每一个输入流Input DStreamings 都要对应一个receivers来接收它,Input DStreamings的种类:文件系统,socket传输,Kafka,Flume。

Output Operation 的种类:print(),saveAsTextFiles保存到文件系统,saveAsHadoopFiles等。

实战:spark streaming 处理socket数据

object NetworkWorldCount {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWorldCount")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("localhost",6789)

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    result.print()

    ssc.start()

    ssc.awaitTermination()

  }

}


在另外一个控制台里输入

nc -lk 6789

a a a a c c c d d d 

结果:


spark streaming 处理socket数据

实战:spark streaming 处理socket数据并写入mysql数据库

object ForeachRDDApp {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("localhost", 6789)

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

//前几行不变。。

    result.foreachRDD(rdd => {       //循环每一个Rdd

      rdd.foreachPartition(partitionOfRecords => {  //在一个rdd里循环每一个partition

        val connection = createCOnnection()     //获取mysql连接

        partitionOfRecords.foreach(record => {        //在每一个partition里获取一条记录

          val sql = "insert into wordcount(word, wordcount) values('" + record._1+ "'," + record._2+")"

          connection.createStatement().execute(sql)

        })

        connection.close()

      })

    })

    ssc.start()

    ssc.awaitTermination()

  }

  def createCOnnection() = {

    Class.forName("com.mysql.jdbc.Driver")

    DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark","root","root")

  }

结果:


结果


spark streaming从socket接收数据后根据标准过滤数据实战(黑名单例子)




//构建黑名单

object TransformApp {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("TransformApp").setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

//跟前面一样

    val blacks = List("zs","ls")    //构建黑名单List

    val blackRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true))        //将List转成(zs,true)的这种RDD类型

    val lines = ssc.socketTextStream("localhost", 6789)        //lines是DSTream类型

    val clicklog = lines.map(x => (x.split(",")(1),x)).transform(rdd => {       

//lines是这种类型的数据(20160410,zs) 根据逗号分隔后重整为(zs:20160410,zs),即为(x.split(",")(1),x))的结果,得到的结果仍然是RDD类型,transform函数是将每个Rdd拿出来操作。

      rdd.leftOuterJoin(blackRDD)   //每个rdd都跟blackRDD进行leftOuterJoin,得到(zs:[<20160410,zs>,<true>])这种类型的数据

        .filter(x=> x._2._2.getOrElse(false) != true)         //过滤,将参数的第二个中的第二个为true的过滤掉。

        .map(x =>x._2._1)      //重整,将结构变为rdd中第二个的第一个,即为<20160410,zs>

    })

    clicklog.print()

    ssc.start()

    ssc.awaitTermination()

  }

}


在nc -lk 6789中输入

20160410,zs

20160410,ls

20160410,ww

20160410,zs

20160410,ls

20160410,ww

20160410,zs

20160410,ls

20160410,ww

控制台输出


结果
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,527评论 5 470
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,314评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,535评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,006评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,961评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,220评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,664评论 3 392
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,351评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,481评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,397评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,443评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,123评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,713评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,801评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,010评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,494评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,075评论 2 341

推荐阅读更多精彩内容