【大数据学习】第十一篇-MapReduce简介

MapReduce定义

MapReduce是一个分布式计算的框架,是用户开发机遇hadoop的数据分析应用的核心框架。

MapReduce的优缺点

  • 优点
  1. 易于编程 只要实现一些简单的接口即可实现功能,且编写程序类似串行
  2. 良好的扩展性 支持扩展计算服务器的数量
  3. 高容错性 可以在价格低廉的机器上运行,即便集群中某些节点宕机,也可以正常使用
  4. 适合PB级离线计算
  • 缺点
    不擅长实时计算、流式计算、DAT计算

MapReduce的编程思想

MapReduce的编程思想

MapReduce主要包括两个部分 Map阶段 + Reduce阶段,每个阶段中的输入输出都是key-value的形式存在
已文本词数统计为例,两个阶段的流程如下:

  1. Map阶段读取Hadoop分片的数据,按行读取自动进行一次map操作,得到输入key-value对应为 “偏移量-本行数据”。
    偏移量实际是该行起始的数据长度索引,可以理解为行号,例如第一行偏移量为0,数据10byte,则第二行偏移量为11。
  2. Map阶段第二步执行我们实现的接口算法,并将结果的key-value(单词-每行的词频 如 java - 1 2 1 4 1)输出都磁盘上。整个Map阶段都是完全并行执行的。
  3. Reduce阶段读取Map的结果,执行实现的接口,对每个分片的结果进行初次的汇总
  4. Reduce阶段对每个分片的结果再次进行汇总成为一个最终结果
    注:通常一个分片对应hadoop中存储的一个块,即128M,这也可以避免载入内存文件过大,撑爆内存

实现wordcount编码

Map类,继承Mapper,重写其map方法实现对每个单词的统计,范型是根据自己业务需要定义的类型

package com.irving.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * map执行类
 * 四个范型是map输入和输出的类型
 * @LongWritable 字符偏移量
 * @Author yuanyc
 * @Date 15:17 2019-07-11
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * 重写map方法
     * @Author yuanyc
     * @Date 15:19 2019-07-11
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 每行起始的偏移量
        System.out.println("-------");
        System.out.println("偏移量" + key.get());
        // 按行读取的数据
        String line = value.toString();
        // 根据空格切分str
        String[] arr = line.split(" ");
        // 对字符传标记1
        for (String str : arr) {
            context.write(new Text(str), new IntWritable(1));
        }
    }
}

Reduce类,继承Reducer类,重写reduce方法,实现对map结果的汇总

package com.irving.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reduce类
 * @Author yuanyc
 * @Date 11:17 2019-07-14
 */
public class WordCountRecuder extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // values 形如 1, 2,2,1,1
        // 对词频进行统计
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}

编写启动类,执行mapreduce算法
注:输入输出路径应当为hdfs的目录,但是本地调试阶段可以使用linux文件系统目录

package com.irving.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 启动类
 * @Author yuanyc
 * @Date 15:39 2019-07-11
 */
public class WordCountMain {
    public static void main(String[] args) {

        Configuration configuration = new Configuration();
//        args = new String[]{"/Users/yuanyc/Documents/workspace/hdfs/test.txt", "/Users/yuanyc/Documents/workspace/hdfs/out"};

        try {
            // 创建job
            Job job = Job.getInstance(configuration);
            job.setJarByClass(WordCountMain.class);

            // 指定map类
            job.setMapperClass(WordCountMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            // 指定reduce
            job.setReducerClass(WordCountRecuder.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            // 指定输入输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 提交任务
            job.waitForCompletion(true);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

测试数据


测试数据

本地执行结果


分片为1

执行结果

注:输出目录不能重复存在,要重新执行时需要删除现有目录

Hadoop的序列化

通过上面代码可以看出,MR在编码过程中使用的输入输出对象类型都是不是自定义的类型。
使用的这些类型是Hadoop定义的基础类型,由于mapreduce过程中伴随大量的IO操作,因此需要针对序列化进行性能优化。
Java常用类型与Hadoop序列化类型的对照表

JDK的类型 Hadoop序列化类型
int IntWritable
long LongWritable
float FloatWritable
double DoubleWritable
byte ByteWritable
boolean BooleanWritable
String Text
Map MapWritable
Array ArrayWritable

自定义Java对象的序列化

自定义的Bean需要实现writable接口,重写序列化和反序列化的方法

package com.irving.wordcount;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 自定义bean的hadoop序列化
 * @Author yuanyc
 * @Date 12:37 2019-07-14
 */
public class BeanWritable implements Writable, Comparable {

    private String name;
    private int age;

    /**
     * 序列化方法
     * @Author yuanyc
     * @Date 12:53 2019-07-14
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeChars(name);
        dataOutput.writeInt(age);
    }

    /**
     * 反序列化,顺序要与序列化一致
     * @Author yuanyc
     * @Date 12:53 2019-07-14
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        name = dataInput.readUTF();
        age = dataInput.readInt();
    }

    /**
     * 自定义对象用key时,需要重写compareto方法,用于shuffle阶段的排序
     * @Author yuanyc
     * @Date 12:52 2019-07-14
     */
    @Override
    public int compareTo(Object o) {
        return 0;
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容