【Spark Java API】Transformation(11)—reduceByKey、foldByKey

reduceByKey


官方文档描述:

Merge the values for each key using an associative reduce function. 
This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.

函数原型:

def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V]

def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V]

**
该函数利用映射函数将每个K对应的V进行运算。
其中参数说明如下:
**

  • func:映射函数,根据需求自定义;
  • partitioner:分区函数;
  • numPartitions:分区数,默认的分区函数是HashPartitioner。

源码分析:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {  
  combineByKey[V]((v: V) => v, func, func, partitioner)
}

**
从源码中可以看出,reduceByKey()是基于combineByKey()实现的,其中createCombiner只是简单的转化,而mergeValue和mergeCombiners相同,都是利用用户自定义函数。reduceyByKey() 相当于传统的 MapReduce,整个数据流也与 Hadoop 中的数据流基本一样。在combineByKey()中在 map 端开启 combine(),因此,reduceyByKey() 默认也在 map 端开启 combine(),这样在 shuffle 之前先通过 mapPartitions 操作进行 combine,得到 MapPartitionsRDD, 然后 shuffle 得到 ShuffledRDD,再进行 reduce(通过 aggregate + mapPartitions() 操作来实现)得到 MapPartitionsRDD。
**

实例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);

//转化为K,V格式
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {    
    @Override    
    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {        
      return new Tuple2<Integer, Integer>(integer,1);    
  }
});
JavaPairRDD<Integer,Integer> reduceByKeyRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {    
    @Override      
    public Integer call(Integer v1, Integer v2) throws Exception {        
      return v1 + v2;    
  }
});
System.out.println(reduceByKeyRDD.collect());

//指定numPartitions
JavaPairRDD<Integer,Integer> reduceByKeyRDD2 = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {    
    @Override    
    public Integer call(Integer v1, Integer v2) throws Exception {        
      return v1 + v2;    
  }
},2);
System.out.println(reduceByKeyRDD2.collect());

//自定义partition
JavaPairRDD<Integer,Integer> reduceByKeyRDD4 = javaPairRDD.reduceByKey(new Partitioner() {    
      @Override    
      public int numPartitions() {    return 2;    }    
      @Override    
      public int getPartition(Object o) {        
        return (o.toString()).hashCode()%numPartitions();    
  }
}, new Function2<Integer, Integer, Integer>() {    
    @Override      
    public Integer call(Integer v1, Integer v2) throws Exception {        
      return v1 + v2;    
  }
});
System.out.println(reduceByKeyRDD4.collect());

foldByKey


官方文档描述:

Merge the values for each key using an associative function and a neutral "zero value" which 
may be added to the result an arbitrary number of times, and must not change the result 
(e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).

函数原型:

def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V]

def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V]

def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V]

**
该函数用于将K对应V利用函数映射进行折叠、合并处理,其中参数zeroValue是对V进行初始化。
具体参数如下:
**

  • zeroValue:初始值;
  • numPartitions:分区数,默认的分区函数是HashPartitioner;
  • partitioner:分区函数;
  • func:映射函数,用户自定义函数。

源码分析:

def foldByKey( zeroValue: V,  partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {  
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key  
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)  
    val zeroArray = new Array[Byte](zeroBuffer.limit)  
    zeroBuffer.get(zeroArray)  
    // When deserializing, use a lazy val to create just one instance of the serializer per task  
    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()  
    val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))  
    val cleanedFunc = self.context.clean(func)  
    combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner)
}

**
从foldByKey()实现可以看出,该函数是基于combineByKey()实现的,其中createCombiner只是利用zeroValue对V进行初始化,而mergeValue和mergeCombiners相同,都是利用用户自定义函数。在这里需要注意如果实现K的V聚合操作,初始设置需要特别注意,不要改变聚合的结果。
**

实例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
final Random rand = new Random(10);
JavaPairRDD<Integer,String> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, String>() {    
    @Override    
    public Tuple2<Integer, String> call(Integer integer) throws Exception {  
      return new Tuple2<Integer, String>(integer,Integer.toString(rand.nextInt(10)));    
  }
});

JavaPairRDD<Integer,String> foldByKeyRDD = javaPairRDD.foldByKey("X", new Function2<String, String, String>() {    
    @Override    
    public String call(String v1, String v2) throws Exception {        
      return v1 + ":" + v2;    
  }
});
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD.collect());

JavaPairRDD<Integer,String> foldByKeyRDD1 = javaPairRDD.foldByKey("X", 2, new Function2<String, String, String>() {    
    @Override    
    public String call(String v1, String v2) throws Exception {        
      return v1 + ":" + v2;    
    }
});
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD1.collect());

JavaPairRDD<Integer,String> foldByKeyRDD2 = javaPairRDD.foldByKey("X", new Partitioner() {    
    @Override    
    public int numPartitions() {        return 3;    }    
    @Override    
    public int getPartition(Object key) {        
      return key.toString().hashCode()%numPartitions();    
  }
}, new Function2<String, String, String>() {    
    @Override    
    public String call(String v1, String v2) throws Exception {        
      return v1 + ":" + v2;    
  }
});
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD2.collect());
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容