一 Shuffle
机制
1️⃣
Shuffle
机制 :Map
方法之后,Reduce
方法之前的数据处理过程称之为Shuffle
.2️⃣Partition
分区3️⃣Partition
分区案例实操
1.需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据1 13736230513 192.196.100.1 www.xxx.com 2481 24681 200 2 13846544121 192.196.100.2 264 0 200 3 13956435636 192.196.100.3 132 1512 200 4 13966251146 192.168.100.1 240 0 404 5 18271575951 192.168.100.2 www.xxx.com 1527 2106 200 6 84188413 192.168.100.3 www.xxx.com 4116 1432 200 7 13590439668 192.168.100.4 1116 954 200 8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200 9 13729199489 192.168.100.6 240 0 200 10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200 11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200 12 15959002129 192.168.100.9 www.xxx.com 1938 180 500 13 13560439638 192.168.100.10 918 4938 200 14 13470253144 192.168.100.11 180 180 200 15 13682846555 192.168.100.12 www.qq.com 1938 2910 200 16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200 17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404 18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200 19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200 20 13768778790 192.168.100.17 120 120 200 21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200 22 13568436656 192.168.100.19 1116 954 200
(2)期望输出数据
手机号136
、137
、138
、139
开头都分别放到一个独立的4
个文件中,其他开头的放到一个文件中。
2.需求分析3.在案例2.4的基础上,增加一个分区类package com.xxx.mapreduce.flowsum; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 获取电话号码的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判断是哪个省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
4.在驱动函数中增加自定义数据分区设置和
ReduceTask
设置package com.xxx.mapreduce.flowsum; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowsumDriver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[]{"e:/output1","e:/output2"}; // 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路径 job.setJarByClass(FlowsumDriver.class); // 3 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 4 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5 指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 8 指定自定义数据分区 job.setPartitionerClass(ProvincePartitioner.class); // 9 同时指定相应数量的reduce task job.setNumReduceTasks(5); // 6 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
4️⃣
WritableComparable
排序1. 排序分类2.自定义排序WritableComparable
(1)原理分析
bean
对象做为key
传输,需要实现WritableComparable
接口重写compareTo
方法,就可以实现排序。@Override public int compareTo(FlowBean o) { int result; // 按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; }else if (sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; }
5️⃣
WritableComparable
排序案例实操(全排序)
1.需求
根据案例2.3产生的结果再次对总流量进行排序。
(1)输入数据 : 原始数据1 13736230513 192.196.100.1 www.xxx.com 2481 24681 200 2 13846544121 192.196.100.2 264 0 200 3 13956435636 192.196.100.3 132 1512 200 4 13966251146 192.168.100.1 240 0 404 5 18271575951 192.168.100.2 www.xxx.com 1527 2106 200 6 84188413 192.168.100.3 www.xxx.com 4116 1432 200 7 13590439668 192.168.100.4 1116 954 200 8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200 9 13729199489 192.168.100.6 240 0 200 10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200 11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200 12 15959002129 192.168.100.9 www.xxx.com 1938 180 500 13 13560439638 192.168.100.10 918 4938 200 14 13470253144 192.168.100.11 180 180 200 15 13682846555 192.168.100.12 www.qq.com 1938 2910 200 16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200 17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404 18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200 19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200 20 13768778790 192.168.100.17 120 120 200 21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200 22 13568436656 192.168.100.19 1116 954 200
(2)第一次处理后的数据
13470253144 180 180 360 13509468723 7335 110349 117684 13560439638 918 4938 5856 13568436656 3597 25635 29232 13590439668 1116 954 2070 13630577991 6960 690 7650 13682846555 1938 2910 4848 13729199489 240 0 240 13736230513 2481 24681 27162 13768778790 120 120 240 13846544121 264 0 264 13956435636 132 1512 1644 13966251146 240 0 240 13975057813 11058 48243 59301 13992314666 3008 3720 6728 15043685818 3659 3538 7197 15910133277 3156 2936 6092 15959002129 1938 180 2118 18271575951 1527 2106 3633 18390173782 9531 2412 11943 84188413 4116 1432 5548
(3)期望输出数据
13509468723 7335 110349 117684 13736230513 2481 24681 27162 13956435636 132 1512 1644 13846544121 264 0 264 。。。 。。。
2.需求分析
3.代码实现
(1)FlowBean
对象在在需求1基础上增加了比较功能package com.xxx.mapreduce.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; // 反序列化时,需要反射调用空参构造函数,所以必须有 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } /** * 序列化方法 * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } @Override public int compareTo(FlowBean o) { int result; // 按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; }else if (sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; } }
(2)编写
Mapper
类package com.xxx.mapreduce.sort; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ FlowBean bean = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 封装对象 String phoneNbr = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); bean.set(upFlow, downFlow); v.set(phoneNbr); // 4 输出 context.write(bean, v); } }
(3)编写
Reducer
类package com.xxx.mapreduce.sort; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 循环输出,避免总流量相同情况 for (Text text : values) { context.write(text, key); } } }
(4)编写
Driver
类package com.xxx.mapreduce.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountSortDriver { public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[]{"e:/output1","e:/output2"}; // 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路径 job.setJarByClass(FlowCountSortDriver.class); // 3 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); // 4 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 5 指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
6️⃣
WritableComparable
排序案例实操(区内排序)
1.需求 : 要求每个省份手机号输出的文件中按照总流量内部排序。
2.需求分析 : 基于前一个需求,增加自定义分区类,分区按照省份手机号设置。3.案例实操
(1)增加自定义分区类package com.xxx.mapreduce.sort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner<FlowBean, Text> { @Override public int getPartition(FlowBean key, Text value, int numPartitions) { // 1 获取手机号码前三位 String preNum = value.toString().substring(0, 3); int partition = 4; // 2 根据手机号归属地设置分区 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
(2)在驱动类中添加分区类
// 加载自定义分区类 job.setPartitionerClass(ProvincePartitioner.class); // 设置Reducetask个数 job.setNumReduceTasks(5);
7️⃣
Combiner
合并自定义Combiner
实现步骤
(a)自定义一个Combiner
继承Reducer
,重写Reduce
方法public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 1 汇总操作 int count = 0; for(IntWritable v :values){ count += v.get(); } // 2 写出 context.write(key, new IntWritable(count)); } }
(b)在
Job
驱动类中设置:job.setCombinerClass(WordcountCombiner.class);
8️⃣
Combiner
合并案例实操
1.需求
统计过程中对每一个MapTask
的输出进行局部汇总,以减小网络传输量即采用Combiner
功能。
(1)数据输入banzhang ni hao xihuan hadoop banzhang banzhang ni hao xihuan hadoop banzhang
(2)期望输出数据
期望:Combiner
输入数据多,输出时经过合并,输出数据降低。
2.需求分析3.案例实操-方案一
1)增加一个WordcountCombiner
类继承Reducer
package com.xxx.mr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1 汇总 int sum = 0; for(IntWritable value :values){ sum += value.get(); } v.set(sum); // 2 写出 context.write(key, v); } }
2)在
WordcountDriver
驱动类中指定Combiner
// 指定需要使用combiner,以及用哪个类作为combiner的逻辑 job.setCombinerClass(WordcountCombiner.class);
4.案例实操-方案二
1)将WordcountReducer
作为Combiner
在WordcountDriver
驱动类中指定// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑 job.setCombinerClass(WordcountReducer.class);
- 运行程序见下图所示
9️⃣GroupingComparator
分组(辅助排序)
对Reduce
阶段的数据根据某一个或几个字段进行分组。
分组排序步骤:
(1)自定义类继承WritableComparator
(2)重写compare()
方法@Override public int compare(WritableComparable a, WritableComparable b) { // 比较的业务逻辑 return result; }
(3)创建一个构造将比较对象的类传给父类
protected OrderGroupingComparator() { super(OrderBean.class, true); }
🔟
GroupingComparator
分组案例实操
1.需求 : 有如下订单数据现在需要求出每一个订单中最贵的商品。
(1)输入数据0000001 Pdt_01 222.8 0000002 Pdt_05 722.4 0000001 Pdt_02 33.8 0000003 Pdt_06 232.8 0000003 Pdt_02 33.8 0000002 Pdt_03 522.8 0000002 Pdt_04 122.4
(2)期望输出数据
1 222.8 2 722.4 3 232.8
2.需求分析
(1)利用“订单id
和成交金额”作为key
,可以将Map
阶段读取到的所有订单数据按照id
升序排序,如果id
相同再按照金额降序排序,发送到Reduce
。
(2)在Reduce
端利用groupingComparator
将订单id
相同的kv
聚合成组,然后取第一个即是该订单中最贵商品,如下图所示。3.代码实现
(1)定义订单信息OrderBean
类package com.xxx.mapreduce.order; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean> { private int order_id; // 订单id号 private double price; // 价格 public OrderBean() { super(); } public OrderBean(int order_id, double price) { super(); this.order_id = order_id; this.price = price; } @Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeDouble(price); } @Override public void readFields(DataInput in) throws IOException { order_id = in.readInt(); price = in.readDouble(); } @Override public String toString() { return order_id + "\t" + price; } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } // 二次排序 @Override public int compareTo(OrderBean o) { int result; if (order_id > o.getOrder_id()) { result = 1; } else if (order_id < o.getOrder_id()) { result = -1; } else { // 价格倒序排序 result = price > o.getPrice() ? -1 : 1; } return result; } }
(2)编写
OrderSortMapper
类package com.xxx.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 封装对象 k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); // 4 写出 context.write(k, NullWritable.get()); } }
(3)编写
OrderSortGroupingComparator
类package com.xxx.mapreduce.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id() < bBean.getOrder_id()) { result = -1; } else { result = 0; } return result; } }
(4)编写
OrderSortReducer
类package com.xxx.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
(5)编写
OrderSortDriver
类package com.xxx.mapreduce.order; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderDriver { public static void main(String[] args) throws Exception, IOException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[]{"e:/input/inputorder" , "e:/output1"}; // 1 获取配置信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 设置jar包加载路径 job.setJarByClass(OrderDriver.class); // 3 加载map/reduce类 job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); // 4 设置map输出数据key和value类型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终输出数据的key和value类型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 设置输入数据和输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 8 设置reduce端的分组 >job.setGroupingComparatorClass(OrderGroupingComparator.class); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
二 MapTask
工作机制
(1)Read
阶段:MapTask
通过用户编写的RecordReader
,从输入InputSplit
中解析出一个个key/value
。
(2)Map
阶段:该节点主要是将解析出的key/value
交给用户编写map()
函数处理,并产生一系列新的key/value
。
(3)Collect
收集阶段:在用户编写map()
函数中,当数据处理完成后,一般会调用OutputCollector.collect()
输出结果。在该函数内部,它会将生成的key/value
分区(调用Partitioner
),并写入一个环形内存缓冲区中。
(4)Spill
阶段:即“溢写”,当环形缓冲区满后,MapReduce
会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition
进行排序,然后按照key
进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key
有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out
(N
表示当前溢写次数)中。如果用户设置了Combiner
,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord
中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB
,则将内存索引写到文件output/spillN.out.index
中。
(5)Combine
阶段:当所有数据处理完成后,MapTask
对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask
会将所有临时文件合并成一个大文件,并保存到文件output/file.out
中,同时生成相应的索引文件output/file.out.index
。
在进行文件合并过程中,MapTask
以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor
(默认10
)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个MapTask
最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
三 ReduceTask
工作机制
(1)Copy
阶段:ReduceTask
从各个MapTask
上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge
阶段:在远程拷贝数据的同时,ReduceTask
启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort
阶段:按照MapReduce
语义,用户编写reduce()
函数输入数据是按key
进行聚集的一组数据。为了将key
相同的数据聚在一起,Hadoop
采用了基于排序的策略。由于各个MapTask
已经实现对自己的处理结果进行了局部排序,因此,ReduceTask
只需对所有数据进行一次归并排序即可。
(4)Reduce
阶段:reduce()
函数将计算结果写到HDFS
上。
1.设置ReduceTask
并行度(个数)
ReduceTask
的并行度同样影响整个Job
的执行并发度和执行效率,但与MapTask
的并发数由切片数决定不同,ReduceTask
数量的决定是可以直接手动设置:job.setNumReduceTasks(4); // 默认值是1,手动设置为4
;
2.实验:测试ReduceTask
多少合适
(1)实验环境:1
个Master
节点,16
个Slave
节点:CPU
:8GHZ
,内存:2G
(2)实验结论:(数据量为1GB
)3.注意事项
四 OutputFormat
数据输出
1️⃣
OutputFormat
接口实现类2️⃣自定义OutputFormat
3️⃣自定义OutputFormat
案例实操
1.需求
过滤输入的log
日志,包含xxx
的网站输出到e:/xxx.log
,不包含xxx
的网站输出到e:/other.log
。
(1)输入数据http://www.baidu.com http://www.google.com http://cn.bing.com http://www.xxx.com http://www.sohu.com http://www.sina.com http://www.sin2a.com http://www.sin2desa.com http://www.sindsafa.com
(2)期望输出数据
xxx.log :http://www.xxx.com
other.log :http://cn.bing.com http://www.baidu.com http://www.google.com http://www.sin2a.com http://www.sin2desa.com http://www.sina.com http://www.sindsafa.com http://www.sohu.com
2.需求分析
3.案例实操
(1)编写FilterMapper
类package com.xxx.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 写出 context.write(value, NullWritable.get()); } }
(2)编写
FilterReducer
类package com.xxx.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key k.set(line); // 4 输出 context.write(k, NullWritable.get()); } }
(3)自定义一个
OutputFormat
类package com.xxx.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // 创建一个RecordWriter return new FilterRecordWriter(job); } }
(4)编写
RecordWriter
类package com.xxx.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream xxxOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path xxxPath = new Path("e:/xxx.log"); Path otherPath = new Path("e:/other.log"); // 3 创建输出流 xxxOut = fs.create(xxxPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“xxx”输出到不同文件 if (key.toString().contains("xxx")) { xxxOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 IOUtils.closeStream(xxxOut); IOUtils.closeStream(otherOut); } }
(5)编写
FilterDriver
类package com.xxx.mapreduce.outputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FilterDriver { public static void main(String[] args) throws Exception { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/inputoutputformat", "e:/output2" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }