大数据学习之Hadoop——07MapReduce相关练习01(wordCount + topN)

欢迎关注我的CSDN: https://blog.csdn.net/bingque6535

1. 编写WordCount

  1. Driver端

    package com.hjf.mr.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 9:49
     */
    public class WordCountJob {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 创建Configuration和Job对象
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            // 指定Master对应的类, 就是当前所在的类
            job.setJarByClass(WordCountJob.class);
    
            // 指定Mapper和Reducer对应的类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            // 指定Mapper端输出的key 和value 的类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 指定Reducer端输出的key 和 value的类型
            // 如果Mapper端和Reducer端输出的key, value类型一样, 可省略其中一组的指定
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            Path inputPath = new Path("./Data/words.txt");
            Path outputPath = new Path("./Data/result");
    
            // 如果输出路径存在, 则删除
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(outputPath)){
                fs.delete(outputPath, true);
            }
    
            // 设置文件的输入路径 和 结果的返回路径
            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, outputPath);
    
            // 提交
            job.waitForCompletion(true);
            // job.submit();
        }
    }
    
    
  2. Mapper端

    package com.hjf.mr.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 9:54
     *
     *  Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     *      KEYIN: Mapper端输入key的类型, 一般都是LongWritable
     *      VALUEIN: Mapper端输入value的类型
     *      KEYOUT: Mapper端输出key的类型
     *      VALUEOUT: Mapper端输出value的类型
     */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        IntWritable one = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 将Text型转换为String 型
            String lines = value.toString();
            String[] words = lines.split(" ");
            // word --> (word, 1)
            for (String word: words) {
                context.write(new Text(word), one);
            }
        }
    }
    
  1. Reducer端
    package com.hjf.mr.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 9:53
     *  Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     *      KEYIN: reducer端输入key的类型
     *      VALUEIN: reducer端输入value的类型
     *      KEYOUT: reducer端输出key的类型
     *      VALUEOUT: reducer端输出value的类型
     */
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            // 相同key的value值进行累加
            for (IntWritable value: values) {
                sum += value.get();
            }
            IntWritable count = new IntWritable(sum);
            context.write(key, count);
        }
    }
    

二. TopN

  1. 题目

    1、统计每门课程的平均分
    2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数
    3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分

  2. 数据集

    computer,huangxiaoming,85,86,41,75,93,42,85
    computer,xuzheng,54,52,86,91,42
    computer,huangbo,85,42,96,38
    english,zhaobenshan,54,52,86,91,42,85,75
    english,liuyifei,85,41,75,21,85,96,14
    algorithm,liuyifei,75,85,62,48,54,96,15
    computer,huangjiaju,85,75,86,85,85
    english,liuyifei,76,95,86,74,68,74,48
    english,huangdatou,48,58,67,86,15,33,85
    algorithm,huanglei,76,95,86,74,68,74,48
    algorithm,huangjiaju,85,75,86,85,85,74,86
    computer,huangdatou,48,58,67,86,15,33,85

    english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
    english,huangbo,85,42,96,38,55,47,22
    algorithm,liutao,85,75,85,99,66
    computer,huangzitao,85,86,41,75,93,42,85
    math,wangbaoqiang,85,86,41,75,93,42,85
    computer,liujialing,85,41,75,21,85,96,14,74,86
    computer,liuyifei,75,85,62,48,54,96,15
    computer,liutao,85,75,85,99,66,88,75,91
    computer,huanglei,76,95,86,74,68,74,48
    english,liujialing,75,85,62,48,54,96,15
    math,huanglei,76,95,86,74,68,74,48
    math,huangjiaju,85,75,86,85,85,74,86
    math,liutao,48,58,67,86,15,33,85
    english,huanglei,85,75,85,99,66,88,75,91
    math,xuzheng,54,52,86,91,42,85,75
    math,huangxiaoming,85,75,85,99,66,88,75,91
    math,liujialing,85,86,41,75,93,42,85,75
    english,huangxiaoming,85,86,41,75,93,42,85
    algorithm,huangdatou,48,58,67,86,15,33,85
    algorithm,huangzitao,85,86,41,75,93,42,85,75

1. 问题1

问题: 统计每门课程平均分
思路:
1. 先计算每名学生的每门课程的平均分
2. 再用计算得到的平均分累加计算课程平均分

  1. Driver 端

    package com.hjf.mr.top_n.question1;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 11:06
     *
     */
    public class TopNJob1 {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(TopNJob1.class);
            job.setMapperClass(TopNMapper1.class);
            job.setReducerClass(TopNReducer1.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DoubleWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
    
            Path inputPath = new Path("./Data/score.txt");
            Path outputPath = new Path("./Data/result");
    
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(outputPath)) {
                fs.delete(outputPath, true);
            }
    
            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, outputPath);
    
            job.waitForCompletion(true);
        }
    }
    
    
  2. Mapper 端

    package com.hjf.mr.top_n.question1;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 11:12
     */
    public class TopNMapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String lines = value.toString();
            String[] split = lines.split(",");
            Text courseName = new Text(split[0]);
            double sum = 0.0;
            int count = split.length - 2;
            for (int i = 2; i < split.length; i++) {
                sum += Integer.parseInt(split[i]);
            }
            DoubleWritable avg = new DoubleWritable(sum / count);
            context.write(courseName, avg);
    
        }
    }
    
    
  3. Reducer 端

    package com.hjf.mr.top_n.question1;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 11:13
     */
    public class TopNReducer1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            double sum = 0.0;
            int count = 0;
            for (DoubleWritable value: values){
                sum += value.get();
                count += 1;
            }
            DoubleWritable result = new DoubleWritable(sum / count);
            context.write(key, result);
        }
    }
    
    

2. 问题2

  1. Driver

    package com.hjf.mr.top_n.question2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 11:06
     *
     */
    public class TopNJob2 {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            // 指定自定义分区类
            job.setPartitionerClass(TopNPartition.class);
            // 指定分区数
            job.setNumReduceTasks(4);
    
            job.setJarByClass(TopNJob2.class);
            job.setMapperClass(TopNMapper2.class);
            job.setReducerClass(TopNReducer2.class);
            job.setMapOutputKeyClass(CourseScore.class);
            job.setMapOutputValueClass(NullWritable.class);
            job.setOutputKeyClass(CourseScore.class);
            job.setOutputValueClass(NullWritable.class);
            Path inputPath = new Path("./Data/score.txt");
            Path outputPath = new Path("./Data/result");
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(outputPath)) {
                fs.delete(outputPath, true);
            }
            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, outputPath);
            job.waitForCompletion(true);
        }
    }
    
    
  2. Mapper

    package com.hjf.mr.top_n.question2;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 11:12
     */
    public class TopNMapper2 extends Mapper<LongWritable, Text, CourseScore, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String lines = value.toString();
            String[] split = lines.split(",");
            String courseName = split[0];
            String name = split[1];
            double sum = 0.0;
            int count = split.length - 2;
            for (int i = 2; i < split.length; i++) {
                sum += Integer.parseInt(split[i]);
            }
            CourseScore courseScore = new CourseScore(courseName, name, sum / count);
            context.write(courseScore, NullWritable.get());
        }
    }
    
    
  3. Reducer

    package com.hjf.mr.top_n.question2;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.text.DecimalFormat;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 11:13
     */
    public class TopNReducer2 extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable> {
        @Override
        protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            double sum = 0.0;
    
            for (NullWritable value: values) {
                // 获取 CourseScore对象中的score值, 并将其保留一位小数, 
                // 然后再重新封装成CourseScore对象
                double score = key.getScore();
                DecimalFormat df = new DecimalFormat("0.0");
                double format = Double.parseDouble(df.format(score));
                CourseScore courseScore = new CourseScore(key.getCourse(), key.getName(), format);
    
                context.write(courseScore, value);
            }
        }
    }
    
    
  4. Partition

    package com.hjf.mr.top_n.question2;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 15:42
     * 
     *  Partitioner<KEY, VALUE>
     *      pattition对应的key, value类型应该和Mapper端输出类型保持一致
     */
    public class TopNPartition extends Partitioner<CourseScore, NullWritable> {
    
        @Override
        public int getPartition(CourseScore courseScore, NullWritable nullWritable, int numPartitions) {
            String course = courseScore.getCourse();
            // 将不同课程名的信息保存到不同的分区中
            switch (course) {
                case "algorithm":
                    return 0;
                case "computer":
                    return 1;
                case "english":
                    return 2;
                default:
                    return 3;
            }
        }
    }
    
    
  5. 自定义类型

    package com.hjf.mr.top_n.question2;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-17 15:48
     */
    public class CourseScore implements WritableComparable<CourseScore> {
        private String course;
        private String name;
        private double score;
        
        // 反序列化时, 需要反射调用空参构造函数, 所以必须要有该空参构造函数
        public CourseScore() {
        }
    
        public CourseScore(String course, String name, double score) {
            this.course = course;
            this.name = name;
            this.score = score;
        }
    
        public String getCourse() {
            return course;
        }
    
        public double getScore() {
            return score;
        }
    
        public String getName() {
            return name;
        }
    
        @Override
        public int compareTo(CourseScore that) {
            if (this.score > that.score){
                return -1;
            } else if (this.score < that.score) {
                return 1;
            } else {
                return 0;
            }
        }
        
        // 重写序列化方法
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(course);
            out.writeUTF(name);
            out.writeDouble(score);
        }
        
        // 重写反序列化方法
        // 注意: 反序列化的顺序和序列化的顺序必须完全一致
        @Override
        public void readFields(DataInput in) throws IOException {
            course = in.readUTF();
            name = in.readUTF();
            score = in.readDouble();
        }
    
        @Override
        public String toString() {
            return "课程: " + course + ", 姓名: " + name + ", 分数: " + score;
        }
    }
    
    

3. 问题三

问题3只需在问题2代码的基础上修改一下输出条件即可

public class TopNReducer3 extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable> {
    // 注意: 必须设置在reduce函数外部, 否则每次运行都会被重新初始化
    // 设置一个变量, 用于统计输出的数据条数
    int count = 0;
    // 需要输出前几的数据, 这里就设置为数字几
    private final int top = 3;
    @Override
    protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        for (NullWritable value: values) {
            double score = key.getScore();
            DecimalFormat df = new DecimalFormat("0.0");
            double format = Double.parseDouble(df.format(score));
            CourseScore courseScore = new CourseScore(key.getCourse(), key.getName(), format);
            if (count++ < top){
                context.write(courseScore, value);
            }
        }
    }
}

欢迎关注我的CSDN: https://blog.csdn.net/bingque6535

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335