spark combineByKey常用的数据操作

聚合函数combineByKey

将RDD[k,v]转化为RDD[k,c],利用该函数可以实现reduceByKey函数的功能。也可以实现类似于join的操作

参数简介

  • createCombiner: V => C

处理每个分区数据时,如果遇到key没有出现的,就会创建一个该键对应的累加器初始值,每个分区相互独立。

  • mergeValue: (C, V) => C

处理每个分区数据时,如果遇到key已经出现,则利用mergeValue进行合并处理。

  • mergeCombiners: (C, C) => C

所有分区数据处理完成后,利用mergeCombiners对各个分区的累加器进行再次合并

实现reduceByKey函数

将List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0))中的数据按照key,对value做求和计算,顺带统计次数

val rdd = sc.parallelize(List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0)))
type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数)
val combReault = rdd.combineByKey(
  score => (1, score),
  (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
  (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
)
//打印计算结果
combReault.collect().foreach(println)
//结果
(A,(2,101.0))
(B,(2,94.0))
(C,(1,91.0))

实现join操作

spark实现join操作非常简单 rddA.join(rddB)即可实现

def joinTest(sc:SparkContext): Unit ={
val rddA = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"),
  (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
val rddB = sc.parallelize(List((1,"songshifan"),(2,"haiyang"),(3,"home")))
rddA.join(rddB).collect().foreach(println)}
//结果
(1,(www,songshifan))
(1,(iteblog,songshifan))
(1,(com,songshifan))
(2,(bbs,haiyang))
(2,(iteblog,haiyang))
(2,(com,haiyang))
(3,(good,home))

跟sql的left join类似

  • 下面我们尝试使用spark sql来实现join操作
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
* Created by songsf on 2017/7/4.
*/
object SparkSqlTest {
    def main(args: Array[String]) {
    val spark = SparkSession
                .builder().master("local[*]")
                .appName("Spark SQL data sources example")
                .config("spark.some.config.option", "some-value")
                .getOrCreate()
    val sc = spark.sparkContext
    val rddA = sc.parallelize((List(("1", "www"), ("1", "iteblog"), ("1", "com"),
    ("2", "bbs"), ("2", "iteblog"), ("2", "com"), ("3", "good")))).map(attributes =>         Row(attributes._1, attributes._2))
    val rddB = sc.parallelize(List(("1", "songshifan"), ("2", "haiyang"), ("3",              "home"))).map(attributes => Row(attributes._1, attributes._2))
    val schemaString = "key name"
    val fields = schemaString.split(" ")
    .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    val dataA = spark.createDataFrame(rddA, schema)
    dataA.createOrReplaceTempView("dataA")
    val dataB = spark.createDataFrame(rddB, schema)
    dataB.createOrReplaceTempView("dataB")
    dataA.show()
    dataB.show()
    val dataA_1 = spark.sql("select * from dataA where key = '1'").show()
    val BLeftJoinA = spark.sql("select a.*,b.name name2 from dataA a left join dataB b on a.key = b.key").show()
    spark.stop()
    }
}
//结果
+---+-------+----------+
|key|   name|     name2|
+---+-------+----------+
|  3|   good|      home|
|  1|    www|songshifan|
|  1|iteblog|songshifan|
|  1|    com|songshifan|
|  2|    bbs|   haiyang|
|  2|iteblog|   haiyang|
|  2|    com|   haiyang|
+---+-------+----------+
  • 注意:在使用spark-session时,总是会报SparkSession类找不到的错误,这是因为我们的代码是运行在本地环境中,maven在打包的时候没有把Spark-session相关的内容打到我们的package中,这一点可以将编译好的jar包解压到相应的目录下找找看。

  • 解决办法:在编辑器运行时,强制指定依赖的jar包。

  • 疑问:之前测试过1.4版本的,写好的代码不把依赖jar包打入我们的jar包中,提交集群时会报错,所以1把所有依赖包都打入jar包中,2 在执行时用--jars参数去提交机器上找jar包。现在有一种说法是运行环境已经把依赖包都放在创建的执行器中,不必再加入依赖jar包。这个需要继续研究、测试。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,478评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,825评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,482评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,726评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,633评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,018评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,513评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,168评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,320评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,264评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,288评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,995评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,587评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,667评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,909评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,284评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,862评论 2 339

推荐阅读更多精彩内容