sparkRddFunctionNote

zipWithIndex:首先基于分区索引  然后基于分区内元素索引 第一个元素是第一个分区的第一个元素 最后一个元素是最后一个分区的最后一个元素 Index的返回类型是Long类型而不是Int 如果RDD不止一个分区,则触发一个spark job,如果是根据groupBy()返回的RDD 不能保证一个分区内的元素排序,所以 如果需要确保每一个元素的索引序列,需要针对RDD使用sortByKey() 算子 进行sort  或者保存进一个文件

val seqRdd = sc.parallelize(List("Mary","Jim","Green","Jack","Tony"))

seqRdd.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,Mary)

// (partitionIndex:00,Jim)

// (partitionIndex:01,Green)

// (partitionIndex:01,Jack)

// (partitionIndex:01,Tony)

seqRdd.zipWithIndex().collect().foreach(println)

// (Mary,0)

// (Jim,1)

// (Green,2)

// (Jack,3)

// (Tony,4)

seqRdd.zipWithIndex().mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(Mary,0))

// (partitionIndex:00,(Jim,1))

// (partitionIndex:01,(Green,2))

// (partitionIndex:01,(Jack,3))

// (partitionIndex:01,(Tony,4))

zip:

将当前RDD与另外一个RDD进行zip操作,返回从两个RDD返回的第一个元素对/第二个元素对 作为key-value对,假定两个RDD 具有两个相同数据量的分区且每个分区的元素数量相同,如果存在某一个分区对,其元素数量不相同,则抛出SparkException:Can Only zip RDDs with the same number of elements in each partition

CollectAsMap:将当前RDD的key-value 对 以Map形式返回至master节点,如果一个key有多个value。只能返回一个value

seqRdd.zip(seqRdd).collectAsMap().foreach(println);seqRdd.zip(seqRdd).foreach(println);

// (Jim,Jim) (Jack,Jack) (Green,Green) (Mary,Mary) (Tony,Tony)

// (Mary,Mary)(Jim,Jim)(Green,Green)(Jack,Jack)(Tony,Tony)

zipWithUniqueId:

使用唯一的Long id 与当前RDD 进行Zips操作,在第K个分区内的元素(其分区内索引分别为0,1,2,3,4)的id分别为:k,k+n,k+2*n,k+3*n... n为分区总数,因此在这里存在一定的跳跃,但是与zipWithIndex不同,该方法不会触发spark job,其他与其一致。

seqRdd.zipWithUniqueId().collect.foreach(println)

// (Mary,0)

// (Jim,2)

// (Green,1)

// (Jack,3)

// (Tony,5)

// 诸如 combineByKey等算子,在指定参数名及其具体值时,需要注意其参数名应与声明时参数名一致。

val seq1Rdd = sc.parallelize(List("Mary","Jim","Green","Jack","Tony"))

val seq2Rdd = sc.parallelize(List(1,2,1,2,1))

val zipRdd = seq2Rdd.zip(seq1Rdd)

combineRdd:方法向后兼容,使用一系列常用的聚合函数对RDD的每个Key 联合组成其value值

val combineRdd = zipRdd.combineByKey(createCombiner = (x:String) => List(x),mergeValue = (x:List[String],y:String) => x.:+(y),mergeCombiners = (x:List[String],y:List[String]) => x.:::(y))

combineRdd.foreach(println)

// (2,(List(Jack,Jim)))

// (1,(List(Green,Tony,Mary)))

解析:

combineByKey属于Key-value算子,做的是聚合操作,这种变换不会触发作业的提交,主要有三个参数:

createCombiner function:一个组合函数 用于将RDD[K,V] 中的V转换成一个新的值C1

mergeValue function: 合并值函数,将一个C1类型值和一个V类型值合并成一个C2类型,输入参数为(C1,V) 输出为新的C2

mergeCombiners function:合并组合器函数 用于将两个C2类型值合并成一个C3类型 输入参数为(C2,C2) 输出为C2

val createCombine = (x:String) => List(x)

val mergeValue = (x:List[String],y:String) => y :: x

// Adda an element at the beginning of this list.{{{ 1 :: List(2,3) = List(2,3).::(1) = List(1,2,3)}}}

val mergerCombiners = (x:List[String],y:List[String]) => x ::: y // Adds the elements of a given list in front of this list. {{{ List(1,2) ::: List(3,4) = List(3,4).:::List(1,2) = List(1,2,3,4)}}}

val combineRdd2 = zipRdd.combineByKey(createCombine,mergeValue,mergerCombiners)

combineRdd2.foreach(println)

// (2,(List(Jim,Jack)))

// (1,(List(Mary,Tony,Green)))

aggregateByKey:

使用combine组合函数和一个中性netural“zero value”对每一个key的多个value进行聚合操作,可以返回不同于RDD中固有数据类型V的结果类型U,一个merge操作将V 转换为U,另外一个merge操作将两个U聚合,(集合可反复遍历) 之前的数据类型转换操作是在一个分区内部进行的  之后的将U进行聚合操作是在分区之间进行的。

为避免内存泄漏,这些函数均可以被修改并且返回他们的第一个参数而不是创建一个新的U实例。 计算的时候与分区的关系很大,注意分区的作用。

val seqRdd = sc.parallelize(List(("cat",2),("dog",12),("cat",12),("cat",5),("mouse",4),("mouse",2)))

seqRdd.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(cat,2))

// (partitionIndex:00,(dog,12))

// (partitionIndex:00,(cat,12))

// (partitionIndex:01,(cat,5))

// (partitionIndex:01,(mouse,4))

// (partitionIndex:01,(mouse,2))

val aggregateFuncRdd = seqRdd.aggregateByKey[Int](zeroValue = 0)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2)

aggregateFuncRdd.collect.foreach(println)

// (dog,12) (cat,17) (mouse,4)

seqRdd.aggregateByKey[Int](zeroValue = 10)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2).collect.foreach(println)

// (dog,12) (cat,22) (mouse,10)  首先和当前分区内的同一key的多个value与zeroValue结合进行seqOp运算,分区0内cat 对应的value最大为12,dog为12,分区1内cat对应的value最大为5 mouse为4  再将其与ZeroValue进行计算 0-cat-Max 12 0-dog-Max 12 1-cat-Max 10 1-mouse-Max 10  然后再将两个分区的同一key的多个value进行相加  得到 (dog,12) (cat,22) (mouse,10)

seqRdd.aggregateByKey[Int](zeroValue = 10)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2).collect.foreach(println)

// (dog,100) (cat,200) (mouse,100)

countByKey:

计算每一个Key对应的元素数量,并讲结果collect至本地Map格式,结果Map必须相对较小 才能够全部加载进Driver's 内存 处理大数据量的话  可以返回一个RDD[K,Long] 代替一个Map

seqRdd.countByKey() // self.mapValue(_ => 1L).reduceByKey(_+_).collect().toMap

seqRdd.countByValue()// map(value => (value,null)).countByKey()  当前value不是key-value对的value  而是指代整个key-value 以(value,count)对的本地Map形式返回当前RDD的不同value的个数(此处应该是一个Bug)

seqRdd.map(_._2).countByValue()

sortByKey():

进行了shuffle操作,shuffle之后被重新分区,排序靠前的元素在低序号分区,排序靠后的元素在高序号分区,每一个分区内包含一段已经排序好的元素,针对结果如果调用collect 或者save算子,将会返回或者输出一段已经排序好的记录,调用save算子时将会在文件系统内生成多个依照key进行排序的“part-x”的文件

val ze = sc.parallelize(List("dog","tiger","tac","cat","gnu","panther"),2)

val zf = ze.map(x => (x.length,x))

zf.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(3,dog))

// (partitionIndex:00,(5,tiger))

// (partitionIndex:00,(3,tac))

// (partitionIndex:01,(3,cat))

// (partitionIndex:01,(3,gnu))

// (partitionIndex:01,(7,panther))

zf.sortByKey().foreachPartition(x => println("sortByKey....",x.toList.mkString(",")))

// (sortByKey....,(5,tiger),(7,panther))

// (sortByKey....,(3,dog),(3,tac),(3,cat),(3,gnu))

zf.sortByKey().mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(3,dog))

// (partitionIndex:00,(3,tac))

// (partitionIndex:00,(3,cat))

// (partitionIndex:00,(3,gnu))

// (partitionIndex:01,(5,tiger))

// (partitionIndex:01,(7,panther))

foldByKey():使用一个组合函数 和中性值 neutral “zero value” 数据类型与Value类型相同,执行时首先针对某一分区内相关元素进行组合函数计算 然后 结合 zero value 进行计算。。 然后根据Key将不同分区的组合计算结果进行组合函数运算

zf.foldByKey(zeroValue="#")((x:String,y:String) => x + y)..mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("foldByKey:"+Index,x)).toIterator}.collect().foreach(println)

// (foldByKey:1,(3,#dogtac#catgnu))

// (foldByKey:1,(7,#panther))

// (foldByKey:1,(5,#tiger))

Join:

fullOuterJoin:全连接 (key,(Some(v1),Some(v2)))

join:连接 (key,(v1,v2))

leftOuterJoin:左连接 (key,(v1,Some(v2)))

rightOuterJoin:右连接 (key,(Some(v1),v2))

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 193,968评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,682评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,254评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,074评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,964评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,055评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,484评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,170评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,433评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,512评论 2 308
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,296评论 1 325
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,184评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,545评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,150评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,437评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,630评论 2 335

推荐阅读更多精彩内容