离线数据分析平台实战——070深入理解MapReduce 02
Shuffle阶段说明
shuffle阶段主要包括map阶段的combine、group、sort、partition以及reducer阶段的合并排序。
Map阶段通过shuffle后会将输出数据按照reduce的分区分文件的保存,
文件内容是按照定义的sort进行排序好的。
Map阶段完成后会通知ApplicationMaster,然后AM会通知Reduce进行数据的拉取,在拉取过程中进行reduce端的shuffle过程。
用户自定义Combiner
Combiner可以减少Map阶段的中间输出结果数,降低网络开销。
默认情况下是没有Combiner的。
用户自定义的Combiner要求是Reducer的子类,以Map的输出<key,value>作为Combiner的输入<key,value>和输出<key,value>,也就是说Combiner的输入和输出必须是一样的。
可以通过job.setCombinerClass设置combiner的处理类,MapReduce框架不保证一定会调用该类的方法。
用户自定义Partitoner
Partitioner是用于确定map输出的<key,value>对应的处理reducer是那个节点。
默认MapReduce任务reduce个数为1个,此时Partitioner其实没有什么效果,但是当我们将reduce个数修改为多个的时候,partitioner就会决定key所对应reduce的节点序号(从0开始)。
可以通过job.setPartitionerClass方法指定Partitioner类,默认情况下使用HashPartitioner(默认调用key的hashCode方法)。
用户自定义Group
GroupingComparator是用于将Map输出的<key,value>进行分组组合成<key,List<value>>的关键类,直白来讲就是用于确定key1和key2是否属于同一组,如果是同一组,就将map的输出value进行组合。
要求我们自定义的类实现自接口RawComparator,可以通过job.setGroupingComparatorClass方法指定比较类。
默认情况下使用WritableComparator,但是最终调用key的compareTo方法进行比较。
用户自定义Sort
SortComparator是用于将Map输出的<key,value>进行key排序的关键类,直白来讲就是用于确定key1所属组和key2所属组那个在前,那个在后。
要求我们自定义的类实现自接口RawComparator,可以通过job.setSortComparatorClass方法指定比较类。默认情况下使用WritableComparator,但是最终调用key的compareTo方法进行比较。
用户自定义Reducer的Shuffle
在reduce端拉取map的输出数据的时候,会进行shuffle(合并排序),MapReduce框架以插件模式提供了一个自定义的方式,我们可以通过实现接口ShuffleConsumerPlugin
,并指定参数mapreduce.job.reduce.shuffle.consumer.plugin.class
来指定自定义的shuffle规则,但是一般情况下,直接采用默认的类org.apache.hadoop.mapreduce.task.reduce.Shuffle
。
案例--二次排序
hadoop默认只对key进行排序,有时候我们需要将value部分也进行排序,这种情况下有两种方式实现,第一种,我们将排序放到reducer端进行,但是这种方式当数据量比较大的时候,会比较消耗内存。
那么另外一种方式就是二次排序。
二次排序的内部实行其实是先按照key+value组合的方式进行排序,然后根据单独key进行分组的一种实行方式。
要求reducer个数为2,而且奇数到第一个reducer进行处理,偶数到第二个reducer进行处理。