大数据入门(五) - 分布式计算框架 MapReduce

1 概述

  • MapReduce源自Google的MapReduce论文,论文发表于2004年12月
  • Hadoop MapReduce可以说是Google MapReduce的一个开源实现
  • MapReduce优点在于可以将海量的数据进行离线处理,并且MapReduce也易于开发,因为MapReduce框架帮我们封装好了分布式计算的开发。而且对硬件设施要求不高,可以运行在廉价的机器上
  • MapReduce也有缺点,它最主要的缺点就是无法完成实时流式计算,只能离线处理。



2 MapReduce编程模型




hello world
hello hadoop
hello MapReduce


hello 3
world 1
hadoop 1
MapReduce 1





  • 使用MapReduce执行WordCount的流程示意图

  • 输入的数据集会被拆分为多个块,然后这些块都会被放到不同的节点上进行并行的计算

  • 在Splitting这一环节会把单词按照分割符或者分割规则进行拆分,拆分完成后就到Mapping

  • 到Mapping这个环节后会把相同的单词通过网络进行映射或者说传输到同一个节点上

  • 接着这些相同的单词就会在Shuffling环节时进行洗牌也就是合并

  • 合并完成之后就会进入Reducing环节,这一环节就是把所有节点合并后的单词再进行一次合并,

  • 也就是会输出到HDFS文件系统中的某一个文件中


3 MapReduce执行流程


  • Map阶段对应的就是一堆的Map Tasks
  • 同样的Reduce阶段也是会对应一堆的Reduce Tasks



Hadoop Writable接口是基于DataInput和DataOutput实现的序列化协议,紧凑(高效使用存储空间),快速(读写数据、序列化与反序列化的开销小)

package org.apache.hadoop.io;

import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

 * <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
 * framework implements this interface.</p>
 * <p>Implementations typically implement a static <code>read(DataInput)</code>
 * method which constructs a new instance, calls {@link #readFields(DataInput)} 
 * and returns the instance.</p>

 *     public class MyWritable implements Writable {
 *       // Some data     
 *       private int counter;
 *       private long timestamp;
 *       public void write(DataOutput out) throws IOException {
 *         out.writeInt(counter);
 *         out.writeLong(timestamp);
 *       }
 *       public void readFields(DataInput in) throws IOException {
 *         counter = in.readInt();
 *         timestamp = in.readLong();
 *       }
 *       public static MyWritable read(DataInput in) throws IOException {
 *         MyWritable w = new MyWritable();
 *         w.readFields(in);
 *         return w;
 *       }
 *     }
public interface Writable {
   * Serialize the fields of this object to <code>out</code>.
  void write(DataOutput out) throws IOException;

   * Deserialize the fields of this object from <code>in</code>.  
   * <p>For efficiency, implementations should attempt to re-use storage in the 
   * existing object where possible.</p>
  void readFields(DataInput in) throws IOException;




下面的实例展示了如何定制一个Writable类,一个定制的Writable类首先必须实现Writable或者WritableComparable接口,然后为定制的Writable类编写write(DataOutput out)和readFields(DataInput in)方法,来控制定制的Writable类如何转化为字节流(write方法)和如何从字节流转回为Writable对象。

package com.javaedge.hadoop.project;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;

 * This MyWritable class demonstrates how to write a custom Writable class
 * @author sss
 * @date 2019-04-07
public class MyWritable implements Writable {
    private VLongWritable field1;
    private VLongWritable field2;

    public MyWritable() {
        this.set(new VLongWritable(), new VLongWritable());
    public MyWritable(VLongWritable fld1, VLongWritable fld2) {
        this.set(fld1, fld2);

    public void set(VLongWritable fld1, VLongWritable fld2) {
        //make sure the smaller field is always put as field1
        if (fld1.get() <= fld2.get()) {
            this.field1 = fld1;
            this.field2 = fld2;
        } else {

            this.field1 = fld2;
            this.field2 = fld1;

     * How to write and read MyWritable fields from DataOutput and DataInput stream
     * @param out
     * @throws IOException
    public void write(DataOutput out) throws IOException {


    public void readFields(DataInput in) throws IOException {


     * Returns true if <code>o</code> is a MyWritable with the same values.
    public boolean equals(Object o) {
        if (!(o instanceof MyWritable)) {
            return false;

        MyWritable other = (MyWritable) o;
        return field1.equals(other.field1) && field2.equals(other.field2);


    public int hashCode() {

        return field1.hashCode() * 163 + field2.hashCode();

    public String toString() {
        return field1.toString() + "\t" + field2.toString();



 * A {@link Writable} which is also {@link Comparable}. 
 * <p><code>WritableComparable</code>s can be compared to each other, typically 
 * via <code>Comparator</code>s. Any type which is to be used as a 
 * <code>key</code> in the Hadoop Map-Reduce framework should implement this
 * interface.</p>
 * <p>Note that <code>hashCode()</code> is frequently used in Hadoop to partition
 * keys. It's important that your implementation of hashCode() returns the same 
 * result across different instances of the JVM. Note also that the default 
 * <code>hashCode()</code> implementation in <code>Object</code> does <b>not</b>
 * satisfy this property.</p>
 * <p>Example:</p>
 * <p><blockquote><pre>
 *     public class MyWritableComparable implements WritableComparable<MyWritableComparable> {
 *       // Some data
 *       private int counter;
 *       private long timestamp;
 *       public void write(DataOutput out) throws IOException {
 *         out.writeInt(counter);
 *         out.writeLong(timestamp);
 *       }
 *       public void readFields(DataInput in) throws IOException {
 *         counter = in.readInt();
 *         timestamp = in.readLong();
 *       }
 *       public int compareTo(MyWritableComparable o) {
 *         int thisValue = this.value;
 *         int thatValue = o.value;
 *         return (thisValue &lt; thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
 *       }
 *       public int hashCode() {
 *         final int prime = 31;
 *         int result = 1;
 *         result = prime * result + counter;
 *         result = prime * result + (int) (timestamp ^ (timestamp &gt;&gt;&gt; 32));
 *         return result
 *       }
 *     }
 * </pre></blockquote></p>
public interface WritableComparable<T> extends Writable, Comparable<T> {


  • 示意图


 *   Split-up the input file(s) into logical {@link InputSplit}s, each of 
 *   which is then assigned to an individual {@link Mapper}.
 *   </li>
 *   <li>
 *   Provide the {@link RecordReader} implementation to be used to glean
 *   input records from the logical <code>InputSplit</code> for processing by 
 *   the {@link Mapper}.
 *   </li>
 * </ol>
 * <p>The default behavior of file-based {@link InputFormat}s, typically 
 * sub-classes of {@link FileInputFormat}, is to split the 
 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
 * bytes, of the input files. However, the {@link FileSystem} blocksize of  
 * the input files is treated as an upper bound for input splits. A lower bound 
 * on the split size can be set via 
 * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
 * mapreduce.input.fileinputformat.split.minsize</a>.</p>
 * <p>Clearly, logical splits based on input-size is insufficient for many 
 * applications since record boundaries are to respected. In such cases, the
 * application has to also implement a {@link RecordReader} on whom lies the
 * responsibility to respect record-boundaries and present a record-oriented
 * view of the logical <code>InputSplit</code> to the individual task.
public abstract class InputFormat<K, V> {

   * Logically split the set of input files for the job.  
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
   * also creates the {@link RecordReader} to read the {@link InputSplit}.
   * @param context job configuration.
   * @return an array of {@link InputSplit}s for the job.
  public abstract List<InputSplit> getSplits(JobContext context);
   * Create a record reader for a given split. The framework will call
   * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
   * the split is used.
   * @param split the split to be read
   * @param context the information about the task
   * @return a new record reader
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 





  • 在HDFS中,数据块是最小的存储单元,默认为128M
  • 默认情况下,HDFS与MapReduce是一一对应的,当然我们也可以手动所设置它们之间的关系(但是不建议这么做)



 * <code>OutputFormat</code> describes the output-specification for a 
 * Map-Reduce job.
 * <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
 * job to:<p>
 * <ol>
 *   <li>
 *   Validate the output-specification of the job. For e.g. check that the 
 *   output directory doesn't already exist. 
 *   <li>
 *   Provide the {@link RecordWriter} implementation to be used to write out
 *   the output files of the job. Output files are stored in a 
 *   {@link FileSystem}.
public abstract class OutputFormat<K, V> {

   * Get the {@link RecordWriter} for the given task.
   * @param context the information about the current task.
   * @return a {@link RecordWriter} to write the output for the job.
   * @throws IOException
  public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context);

   * Check for validity of the output-specification for the job.
   * <p>This is to validate the output specification for the job when it is
   * a job is submitted.  Typically checks that it does not already exist,
   * throwing an exception when it already exists, so that output is not
   * overwritten.</p>
   * @param context information about the job
  public abstract void checkOutputSpecs(JobContext context);

   * Get the output committer for this output format. This is responsible
   * for ensuring the output is committed correctly.
   * @param context the task context
   * @return an output committer
  public abstract 
  OutputCommitter getOutputCommitter(TaskAttemptContext context);



  • OutputFormat

3 MapReduce核心概念

  • 假设我们手动设置了block与split的对应关系,一个block对应两个split

上图中一个block对应两个split(默认是一对一),一个split则是对应一个Map Task。Map Task将数据分完组之后到Shuffle,Shuffle完成后就到Reduce上进行输出,而每一个Reduce Tasks会输出到一个文件上,上图中有三个Reduce Tasks,所以会输出到三个文件上。

3.1 Split

4 MapReduce 1.x 架构

4.1 JobTracker(JT)


4.2 TaskTracker


4.3 MapTask


  • 它会解析每条记录的数据,交给自己的Map方法处理
  • 处理完成会将Map的输出结果写到本地磁盘


4.4 ReduceTask

  • 将MapTask输出的数据进行读取
  • 按照数据的规则进行分组,然后传给我们自己编写的reduce方法处理
  • 处理完成后默认将输出结果写到HDFS

5 MapReduce 2.x 架构


关于MapReduce2.x的架构之前已经在大数据入门(四) - 分布式资源调度——YARN框架

6 Java 实现 wordCount

  • 1.创建一个Maven工程,配置依赖如下

  • 创建一个类,开始编写我们wordcount的实现代码:

  • 3.编写完成之后,在IDEA里通过Maven进行编译打包

  • 4.把打包好的jar包上传到服务器上:

  • 上传到Hadoop服务器

  • 全路径没有问题

hadoop fs -ls hdfs://localhost:9000

7 重构

8 Combiner编程

9 Partitoner

10 JobHistoryServer

  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342