7_大数据之MapReduce_2

Shuffle机制

1️⃣Shuffle机制 : Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle.

2️⃣Partition分区
3️⃣Partition分区案例实操
1.需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据

1  13736230513 192.196.100.1   www.xxx.com 2481    24681   200
2  13846544121 192.196.100.2           264 0   200
3  13956435636 192.196.100.3           132 1512    200
4  13966251146 192.168.100.1           240 0   404
5  18271575951 192.168.100.2   www.xxx.com 1527    2106    200
6  84188413    192.168.100.3   www.xxx.com 4116    1432    200
7  13590439668 192.168.100.4           1116    954 200
8  15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9  13729199489 192.168.100.6           240 0   200
10 13630577991 192.168.100.7   www.shouhu.com  6960    690 200
11 15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12 15959002129 192.168.100.9   www.xxx.com 1938    180 500
13 13560439638 192.168.100.10          918 4938    200
14 13470253144 192.168.100.11          180 180 200
15 13682846555 192.168.100.12  www.qq.com  1938    2910    200
16 13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17 13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18 18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19 13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20 13768778790 192.168.100.17          120 120 200
21 13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22 13568436656 192.168.100.19          1116    954 200

(2)期望输出数据
  手机号136137138139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
2.需求分析

3.在案例2.4的基础上,增加一个分区类

package com.xxx.mapreduce.flowsum;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

  @Override
  public int getPartition(Text key, FlowBean value, int numPartitions) {

      // 1 获取电话号码的前三位
      String preNum = key.toString().substring(0, 3);
      
      int partition = 4;
      
      // 2 判断是哪个省
      if ("136".equals(preNum)) {
          partition = 0;
      }else if ("137".equals(preNum)) {
          partition = 1;
      }else if ("138".equals(preNum)) {
          partition = 2;
      }else if ("139".equals(preNum)) {
          partition = 3;
      }

      return partition;
  }
}

4.在驱动函数中增加自定义数据分区设置和ReduceTask设置

package com.xxx.mapreduce.flowsum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

  public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

      // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
      args = new String[]{"e:/output1","e:/output2"};

      // 1 获取配置信息,或者job对象实例
      Configuration configuration = new Configuration();
      Job job = Job.getInstance(configuration);

      // 2 指定本程序的jar包所在的本地路径
      job.setJarByClass(FlowsumDriver.class);

      // 3 指定本业务job要使用的mapper/Reducer业务类
      job.setMapperClass(FlowCountMapper.class);
      job.setReducerClass(FlowCountReducer.class);

      // 4 指定mapper输出数据的kv类型
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(FlowBean.class);

      // 5 指定最终输出的数据的kv类型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(FlowBean.class);

      // 8 指定自定义数据分区
      job.setPartitionerClass(ProvincePartitioner.class);

      // 9 同时指定相应数量的reduce task
      job.setNumReduceTasks(5);
      
      // 6 指定job的输入原始文件所在目录
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}

4️⃣WritableComparable排序

1. 排序分类
2.自定义排序WritableComparable
(1)原理分析
  bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

@Override
public int compareTo(FlowBean o) {

  int result;
      
  // 按照总流量大小,倒序排列
  if (sumFlow > bean.getSumFlow()) {
      result = -1;
  }else if (sumFlow < bean.getSumFlow()) {
      result = 1;
  }else {
      result = 0;
  }

  return result;
}

5️⃣WritableComparable排序案例实操(全排序)
1.需求
根据案例2.3产生的结果再次对总流量进行排序。
(1)输入数据 : 原始数据

1  13736230513 192.196.100.1   www.xxx.com 2481    24681   200
2  13846544121 192.196.100.2           264 0   200
3  13956435636 192.196.100.3           132 1512    200
4  13966251146 192.168.100.1           240 0   404
5  18271575951 192.168.100.2   www.xxx.com 1527    2106    200
6  84188413    192.168.100.3   www.xxx.com 4116    1432    200
7  13590439668 192.168.100.4           1116    954 200
8  15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9  13729199489 192.168.100.6           240 0   200
10 13630577991 192.168.100.7   www.shouhu.com  6960    690 200
11 15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12 15959002129 192.168.100.9   www.xxx.com 1938    180 500
13 13560439638 192.168.100.10          918 4938    200
14 13470253144 192.168.100.11          180 180 200
15 13682846555 192.168.100.12  www.qq.com  1938    2910    200
16 13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17 13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18 18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19 13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20 13768778790 192.168.100.17          120 120 200
21 13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22 13568436656 192.168.100.19          1116    954 200

(2)第一次处理后的数据

13470253144    180 180 360
13509468723    7335    110349  117684
13560439638    918 4938    5856
13568436656    3597    25635   29232
13590439668    1116    954 2070
13630577991    6960    690 7650
13682846555    1938    2910    4848
13729199489    240 0   240
13736230513    2481    24681   27162
13768778790    120 120 240
13846544121    264 0   264
13956435636    132 1512    1644
13966251146    240 0   240
13975057813    11058   48243   59301
13992314666    3008    3720    6728
15043685818    3659    3538    7197
15910133277    3156    2936    6092
15959002129    1938    180 2118
18271575951    1527    2106    3633
18390173782    9531    2412    11943
84188413   4116    1432    5548

(3)期望输出数据

13509468723    7335    110349  117684
13736230513    2481    24681   27162
13956435636    132     1512    1644
13846544121    264     0       264
。。。 。。。

2.需求分析

3.代码实现
(1)FlowBean对象在在需求1基础上增加了比较功能

package com.xxx.mapreduce.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {

  private long upFlow;
  private long downFlow;
  private long sumFlow;

  // 反序列化时,需要反射调用空参构造函数,所以必须有
  public FlowBean() {
      super();
  }

  public FlowBean(long upFlow, long downFlow) {
      super();
      this.upFlow = upFlow;
      this.downFlow = downFlow;
      this.sumFlow = upFlow + downFlow;
  }

  public void set(long upFlow, long downFlow) {
      this.upFlow = upFlow;
      this.downFlow = downFlow;
      this.sumFlow = upFlow + downFlow;
  }

  public long getSumFlow() {
      return sumFlow;
  }

  public void setSumFlow(long sumFlow) {
      this.sumFlow = sumFlow;
  }   

  public long getUpFlow() {
      return upFlow;
  }

  public void setUpFlow(long upFlow) {
      this.upFlow = upFlow;
  }

  public long getDownFlow() {
      return downFlow;
  }

  public void setDownFlow(long downFlow) {
      this.downFlow = downFlow;
  }

  /**
   * 序列化方法
   * @param out
   * @throws IOException
   */
  @Override
  public void write(DataOutput out) throws IOException {
      out.writeLong(upFlow);
      out.writeLong(downFlow);
      out.writeLong(sumFlow);
  }

  /**
   * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致
   * @param in
   * @throws IOException
   */
  @Override
  public void readFields(DataInput in) throws IOException {
      upFlow = in.readLong();
      downFlow = in.readLong();
      sumFlow = in.readLong();
  }

  @Override
  public String toString() {
      return upFlow + "\t" + downFlow + "\t" + sumFlow;
  }

  @Override
  public int compareTo(FlowBean o) {
      
      int result;
      
      // 按照总流量大小,倒序排列
      if (sumFlow > bean.getSumFlow()) {
          result = -1;
      }else if (sumFlow < bean.getSumFlow()) {
          result = 1;
      }else {
          result = 0;
      }

      return result;
  }
}

(2)编写Mapper

package com.xxx.mapreduce.sort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{

  FlowBean bean = new FlowBean();
  Text v = new Text();

  @Override
  protected void map(LongWritable key, Text value, Context context)   throws IOException, InterruptedException {

      // 1 获取一行
      String line = value.toString();
      
      // 2 截取
      String[] fields = line.split("\t");
      
      // 3 封装对象
      String phoneNbr = fields[0];
      long upFlow = Long.parseLong(fields[1]);
      long downFlow = Long.parseLong(fields[2]);
      
      bean.set(upFlow, downFlow);
      v.set(phoneNbr);
      
      // 4 输出
      context.write(bean, v);
  }
}

(3)编写Reducer

package com.xxx.mapreduce.sort;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{

  @Override
  protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      
      // 循环输出,避免总流量相同情况
      for (Text text : values) {
          context.write(text, key);
      }
  }
}

(4)编写Driver

package com.xxx.mapreduce.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountSortDriver {

  public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

      // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
      args = new String[]{"e:/output1","e:/output2"};

      // 1 获取配置信息,或者job对象实例
      Configuration configuration = new Configuration();
      Job job = Job.getInstance(configuration);

      // 2 指定本程序的jar包所在的本地路径
      job.setJarByClass(FlowCountSortDriver.class);

      // 3 指定本业务job要使用的mapper/Reducer业务类
      job.setMapperClass(FlowCountSortMapper.class);
      job.setReducerClass(FlowCountSortReducer.class);

      // 4 指定mapper输出数据的kv类型
      job.setMapOutputKeyClass(FlowBean.class);
      job.setMapOutputValueClass(Text.class);

      // 5 指定最终输出的数据的kv类型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(FlowBean.class);

      // 6 指定job的输入原始文件所在目录
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      
      // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}

6️⃣WritableComparable排序案例实操(区内排序)
1.需求 : 要求每个省份手机号输出的文件中按照总流量内部排序。
2.需求分析 : 基于前一个需求,增加自定义分区类,分区按照省份手机号设置。

3.案例实操
(1)增加自定义分区类

package com.xxx.mapreduce.sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

  @Override
  public int getPartition(FlowBean key, Text value, int numPartitions) {
      
      // 1 获取手机号码前三位
      String preNum = value.toString().substring(0, 3);
      
      int partition = 4;
      
      // 2 根据手机号归属地设置分区
      if ("136".equals(preNum)) {
          partition = 0;
      }else if ("137".equals(preNum)) {
          partition = 1;
      }else if ("138".equals(preNum)) {
          partition = 2;
      }else if ("139".equals(preNum)) {
          partition = 3;
      }

      return partition;
  }
}

(2)在驱动类中添加分区类

// 加载自定义分区类
job.setPartitionerClass(ProvincePartitioner.class);

// 设置Reducetask个数
job.setNumReduceTasks(5);

7️⃣Combiner合并

自定义Combiner实现步骤
(a)自定义一个Combiner继承Reducer,重写Reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

     // 1 汇总操作
      int count = 0;
      for(IntWritable v :values){
          count += v.get();
      }

     // 2 写出
      context.write(key, new IntWritable(count));
  }
}

(b)在Job驱动类中设置:

job.setCombinerClass(WordcountCombiner.class);

8️⃣Combiner合并案例实操
1.需求
  统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。
(1)数据输入

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang

(2)期望输出数据
期望:Combiner输入数据多,输出时经过合并,输出数据降低。
2.需求分析

3.案例实操-方案一
1)增加一个WordcountCombiner类继承Reducer

package com.xxx.mr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

IntWritable v = new IntWritable();

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

      // 1 汇总
      int sum = 0;

      for(IntWritable value :values){
          sum += value.get();
      }

      v.set(sum);

      // 2 写出
      context.write(key, v);
  }
}

2)在WordcountDriver驱动类中指定Combiner

// 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordcountCombiner.class);

4.案例实操-方案二
1)将WordcountReducer作为CombinerWordcountDriver驱动类中指定

// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑
job.setCombinerClass(WordcountReducer.class);
  1. 运行程序见下图所示
    9️⃣GroupingComparator分组(辅助排序)
    Reduce阶段的数据根据某一个或几个字段进行分组。
    分组排序步骤:
    (1)自定义类继承WritableComparator
    (2)重写compare()方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
      // 比较的业务逻辑
      return result;
}

(3)创建一个构造将比较对象的类传给父类

protected OrderGroupingComparator() {
      super(OrderBean.class, true);
}

🔟GroupingComparator分组案例实操
1.需求 : 有如下订单数据

现在需要求出每一个订单中最贵的商品。
(1)输入数据

0000001    Pdt_01  222.8
0000002    Pdt_05  722.4
0000001    Pdt_02  33.8
0000003    Pdt_06  232.8
0000003    Pdt_02  33.8
0000002    Pdt_03  522.8
0000002    Pdt_04  122.4

(2)期望输出数据

1  222.8
2  722.4
3  232.8

2.需求分析
(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce
(2)在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,如下图所示。

3.代码实现
(1)定义订单信息OrderBean

package com.xxx.mapreduce.order;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean> {

  private int order_id; // 订单id号
  private double price; // 价格

  public OrderBean() {
      super();
  }

  public OrderBean(int order_id, double price) {
      super();
      this.order_id = order_id;
      this.price = price;
  }

  @Override
  public void write(DataOutput out) throws IOException {
      out.writeInt(order_id);
      out.writeDouble(price);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
      order_id = in.readInt();
      price = in.readDouble();
  }

  @Override
  public String toString() {
      return order_id + "\t" + price;
  }

  public int getOrder_id() {
      return order_id;
  }

  public void setOrder_id(int order_id) {
      this.order_id = order_id;
  }

  public double getPrice() {
      return price;
  }

  public void setPrice(double price) {
      this.price = price;
  }

  // 二次排序
  @Override
  public int compareTo(OrderBean o) {

      int result;

      if (order_id > o.getOrder_id()) {
          result = 1;
      } else if (order_id < o.getOrder_id()) {
          result = -1;
      } else {
          // 价格倒序排序
          result = price > o.getPrice() ? -1 : 1;
      }

      return result;
  }
}

(2)编写OrderSortMapper

package com.xxx.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

  OrderBean k = new OrderBean();
  
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      
      // 1 获取一行
      String line = value.toString();
      
      // 2 截取
      String[] fields = line.split("\t");
      
      // 3 封装对象
      k.setOrder_id(Integer.parseInt(fields[0]));
      k.setPrice(Double.parseDouble(fields[2]));
      
      // 4 写出
      context.write(k, NullWritable.get());
  }
}

(3)编写OrderSortGroupingComparator

package com.xxx.mapreduce.order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator extends WritableComparator {

  protected OrderGroupingComparator() {
      super(OrderBean.class, true);
  }

  @Override
  public int compare(WritableComparable a, WritableComparable b) {

      OrderBean aBean = (OrderBean) a;
      OrderBean bBean = (OrderBean) b;

      int result;
      if (aBean.getOrder_id() > bBean.getOrder_id()) {
          result = 1;
      } else if (aBean.getOrder_id() < bBean.getOrder_id()) {
          result = -1;
      } else {
          result = 0;
      }

      return result;
  }
}

(4)编写OrderSortReducer

package com.xxx.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

  @Override
  protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)        throws IOException, InterruptedException {
      
      context.write(key, NullWritable.get());
  }
}

(5)编写OrderSortDriver

package com.xxx.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderDriver {

  public static void main(String[] args) throws Exception, IOException {

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
      args  = new String[]{"e:/input/inputorder" , "e:/output1"};

      // 1 获取配置信息
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf);

      // 2 设置jar包加载路径
      job.setJarByClass(OrderDriver.class);

      // 3 加载map/reduce类
      job.setMapperClass(OrderMapper.class);
      job.setReducerClass(OrderReducer.class);

      // 4 设置map输出数据key和value类型
      job.setMapOutputKeyClass(OrderBean.class);
      job.setMapOutputValueClass(NullWritable.class);

      // 5 设置最终输出数据的key和value类型
      job.setOutputKeyClass(OrderBean.class);
      job.setOutputValueClass(NullWritable.class);

      // 6 设置输入数据和输出数据路径
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 8 设置reduce端的分组
  >job.setGroupingComparatorClass(OrderGroupingComparator.class);

      // 7 提交
      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}

MapTask工作机制

(1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.outN表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。


ReduceTask工作机制

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。
1.设置ReduceTask并行度(个数)
ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:job.setNumReduceTasks(4); // 默认值是1,手动设置为4;
2.实验:测试ReduceTask多少合适
(1)实验环境:1Master节点,16Slave节点:CPU:8GHZ,内存: 2G
(2)实验结论:(数据量为1GB
3.注意事项


OutputFormat数据输出

1️⃣OutputFormat接口实现类

2️⃣自定义OutputFormat
3️⃣自定义OutputFormat案例实操
1.需求
 过滤输入的log日志,包含xxx的网站输出到e:/xxx.log,不包含xxx的网站输出到e:/other.log
(1)输入数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com 
http://www.xxx.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com

(2)期望输出数据
xxx.log : http://www.xxx.com
other.log :

http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com

2.需求分析

3.案例实操
(1)编写FilterMapper

package com.xxx.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
  
  @Override
  protected void map(LongWritable key, Text value, Context context)   throws IOException, InterruptedException {

      // 写出
      context.write(value, NullWritable.get());
  }
}

(2)编写FilterReducer

package com.xxx.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

Text k = new Text();

  @Override
  protected void reduce(Text key, Iterable<NullWritable> values, Context context)     throws IOException, InterruptedException {

    // 1 获取一行
      String line = key.toString();

      // 2 拼接
      line = line + "\r\n";

      // 3 设置key
      k.set(line);

      // 4 输出
      context.write(k, NullWritable.get());
  }
}

(3)自定义一个OutputFormat

package com.xxx.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{

  @Override
  public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)         throws IOException, InterruptedException {

      // 创建一个RecordWriter
      return new FilterRecordWriter(job);
  }
}

(4)编写RecordWriter

package com.xxx.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {

  FSDataOutputStream xxxOut = null;
  FSDataOutputStream otherOut = null;

  public FilterRecordWriter(TaskAttemptContext job) {

      // 1 获取文件系统
      FileSystem fs;

      try {
          fs = FileSystem.get(job.getConfiguration());

          // 2 创建输出文件路径
          Path xxxPath = new Path("e:/xxx.log");
          Path otherPath = new Path("e:/other.log");

          // 3 创建输出流
          xxxOut = fs.create(xxxPath);
          otherOut = fs.create(otherPath);
      } catch (IOException e) {
          e.printStackTrace();
      }
  }

  @Override
  public void write(Text key, NullWritable value) throws IOException, InterruptedException {

      // 判断是否包含“xxx”输出到不同文件
      if (key.toString().contains("xxx")) {
          xxxOut.write(key.toString().getBytes());
      } else {
          otherOut.write(key.toString().getBytes());
      }
  }

  @Override
  public void close(TaskAttemptContext context) throws IOException, InterruptedException {

      // 关闭资源
      IOUtils.closeStream(xxxOut);
      IOUtils.closeStream(otherOut);  }
}

(5)编写FilterDriver

package com.xxx.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterDriver {

  public static void main(String[] args) throws Exception {

      // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
      args = new String[] { "e:/input/inputoutputformat", "e:/output2" };

      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf);

      job.setJarByClass(FilterDriver.class);
      job.setMapperClass(FilterMapper.class);
      job.setReducerClass(FilterReducer.class);

      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(NullWritable.class);
      
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);

      // 要将自定义的输出格式组件设置到job中
      job.setOutputFormatClass(FilterOutputFormat.class);

      FileInputFormat.setInputPaths(job, new Path(args[0]));

      // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
      // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容