大数据学习day_6

思考问题

Mapper类

Mapper类

org.apache.hadoop.mapreduce.Mapper<KEYIN、VALUEIN、KEYOUT、VALUEOUT>

四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,
前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;
后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型。

Mapper有setup(),map(),cleanup()和run()四个方法。

其中setup()一般是用来进行一些map()前的准备工作,
map()则一般承担主要的处理工作,
cleanup()则是收尾工作如关闭文件或者执行map()后的K-V分发等。run()方法提供了setup->map->cleanup()的执行模板。

在MapReduce中,Mapper从一个输入分片中读取数据,然后经过Shuffle and Sort阶段,分发数据给Reducer,在Map端和Reduce端我们可能使用设置的Combiner进行合并,这在Reduce前进行。Partitioner控制每个K-V(键值)对应该被分发到哪个reducer(我们的Job可能有多个reducer),Hadoop默认使用HashPartitioner,HashPartitioner使用key的hashCode对reducer的数量取模得来。

run()方法

public void run(Context context) throws IOException, InterruptedException {  
  setup(context);  
  while (context.nextKeyValue()) {  
    map(context.getCurrentKey(), context.getCurrentValue(), context);  
  }  
  cleanup(context);  
}  

可以得出,K/V对是从传入的Context(上下文)获取的。

map()方法

@SuppressWarnings("unchecked")  
protected void map(KEYIN key, VALUEIN value,   
                   Context context) throws IOException, InterruptedException {  
  context.write((KEYOUT) key, (VALUEOUT) value);  
}  

也看得出输出结果K/V对也是通过Context来完成的
作为map方法输入的键值对,其value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。将<K1,V1>作为map方法的结果输出,其余的工作都交有 MapReduce框架 处理。
这里输入参数key、value 的类型就是KEYIN、VALUEIN,每一个键值对都会调用一次map 函数。在这里,map 函数没有处理输入的key、value,直接通过context.write(…)方法输出了,输出的key、value 的类型就是KEYOUT、VALUEOUT。这是默认实现,通常是需要我们根据业务逻辑覆盖的。

当调用到map时,通常会先执行一个setup函数,最后会执行一个cleanup函数。而默认情况下,这两个函数的内容都是nothing。因此,当map方法不符合应用要求时,可以试着通过增加setup和cleanup的内容来满足应用的需求。

Reducer类

Reducer类

org.apache.hadoop.mapreduce.Reducer<KEYIN、VALUEIN、KEYOUT、VALUEOUT>

四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,
前面两个KEYIN、VALUEIN 指的是map 函数输出的参数,即reduce 函数输入的key、value 的类型;
后面两个KEYOUT、VALUEOUT 指的是reduce 函数输出的key、value 的类型。

Reducer有3个主要的函数,分别是:setup(),clearup(),reduce(),run()。

reducer()

@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, 
Context context ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
}

run()

 @SuppressWarnings("unchecked")
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);
      // If a back up store is used, reset it
      ((ReduceContext.ValueIterator)
          (context.getValues().iterator())).resetBackupStore();
    }
    cleanup(context);
  }
}

当调用到reduce时,通常会先执行一个setup函数,最后会执行一个cleanup函数。而默认情况下,这两个函数的内容都是nothing。因此,当reduce不符合应用要求时,可以试着通过增加setup和cleanup的内容来满足应用的需求。

InputFormat类

平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class);来保证输入文件按照我们想要的格式被读取。

所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。
其实,一个输入格式InputFormat,主要无非就是要解决如何将数据分割成分片(比如多少行为一个分片),以及如何读取分片中的数据(比如按行读取)。前者由getSplits()完成,后者由RecordReader完成。这些方法的实现都在子类中。

1 public abstract class InputFormat<K, V> {
2     
3 public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
4 
5 public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,  InterruptedException;
6 
7 }
InputFormat类图

类InputFomat 是负责把HDFS 中的文件经过一系列处理变成map 函数的输入部分的。这个类做了三件事情:
  第一, 验证输入信息的合法性,包括输入路径是否存在等;
  第二,把HDFS 中的文件按照一定规则拆分成InputSplit,每个InputSplit 由一个Mapper执行;
  第三,提供RecordReader,把InputSplit 中的每一行解析出来供map 函数处理;

MapReduce应用开发人员并不需要直接处理InputSplit,因为它是由InputFormat创建。InputFormat负责产生输入分片并将它们分隔成记录。

InputSplit

我们知道Mappers的输入是一个一个的输入分片,称InputSplit。InputSplit是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。

getLength()用来获取InputSplit的大小,以支持对InputSplits进行排序,而getLocations()则用来获取存储分片的位置列表。

public abstract class InputSplit {  
  public abstract long getLength() throws IOException, InterruptedException;  
  
  public abstract   
    String[] getLocations() throws IOException, InterruptedException;  
}  

InputSplit是hadoop定义的用来传送给每个单独的map的数据,InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组。生成InputSplit的方法可以通过InputFormat()来设置。
当数据传送给map时,map会将输入分片传送到InputFormat,InputFormat则调用方法getRecordReader()生成RecordReader,RecordReader再通过creatKey()、creatValue()方法创建可供map处理的一个一个的<key,value>对。
简而言之,InputFormat()方法是用来生成可供map处理的<key,value>对的。

FileinputFormat类

FileinputFormat类是所有使用文件作为其数据源的InputFormat实现的基类。
它提供了两个功能:

  • 定义哪些文件包含在一个作业的输出中
  • 输入文件生成分片的实现。

并把分片分隔成记录的作业由其子类来完成。

FileinputFormat类的输入路径

作业的输入被设定为一组路径,这对限定作业输入提供了很大的灵活性。

<small>一条路径可以表示一个文件、一个目录或是一个glob,即一个文件和目录的集合。值得注意的是,被值得为输入的路径的目录中的内容不会被递归进行处理!</small>

如果需要排除特定文件可以使用FileInPutFormat的SetInputPathFilter()方法设置一个过滤器:


FileInPutFormat类的输入分片

给定一组文件,FileInPutFormat是如何把它们转换为输入分片的?
FileInPutFormat只分割大文件。这里的大是值超过HDFS块的大小。而分片通常与HDFS块大小一样,也可以设置不同的Hadoop属性来改变。


下面是该类对getSplits 方法的实现
利用FileInputFormat 的getSplits方法,我们就计算出了我们的作业的所有输入分片了
注意:每一个输入分片启动一个Mapper 任务。

public List<InputSplit> getSplits(JobContext job
 2                                     ) throws IOException {
 3     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 4     long maxSize = getMaxSplitSize(job);
 5 
 6     // generate splits
 7     List<InputSplit> splits = new ArrayList<InputSplit>();
 8     List<FileStatus>files = listStatus(job);
 9     for (FileStatus file: files) {
10       Path path = file.getPath();
11       FileSystem fs = path.getFileSystem(job.getConfiguration());
12       long length = file.getLen();
13       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
14       if ((length != 0) && isSplitable(job, path)) { 
15         long blockSize = file.getBlockSize();
16         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
17 
18         long bytesRemaining = length;
19         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
20           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
21           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
22                                    blkLocations[blkIndex].getHosts()));
23           bytesRemaining -= splitSize;
24         }
25         
26         if (bytesRemaining != 0) {
27           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
28                      blkLocations[blkLocations.length-1].getHosts()));
29         }
30       } else if (length != 0) {
31         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
32       } else { 
33         //Create empty hosts array for zero length files
34         splits.add(new FileSplit(path, 0, length, new String[0]));
35       }
36     }
37     
38     // Save the number of input files in the job-conf
39     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
40 
41     LOG.debug("Total # of splits: " + splits.size());
42     return splits;
43   }

那这些计算出来的分片是怎么被map读取出来的呢?就是InputFormat中的另一个方法createRecordReader(),FileInputFormat并没有对这个方法做具体的要求,而是交给子类自行去实现它。

  • RecordReader:
    RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从类图中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。
  • 我们再深入看看上面提到的RecordReader的一个子类: Lin eRecordReader。
    LineRecordReader由一个FileSplit构造出来,start是这个FileSplit的起始位置,pos是当前读取分片的位置,end是分片结束位置,in是打开的一个读取这个分片的输入流,它是使用这个FileSplit对应的文件名来打开的。key和value则分别是每次读取的K-V对。然后我们还看到可以利用getProgress()来跟踪读取分片的进度,这个函数就是根据已经读取的K-V对占总K-V对的比例来显示进度的。

其他输入类

  • CombineFileInputFormat类 能够很好的处理小文件
    WholeFileInputFormat类 使用RecordReader将整个文件读为一条记录。

  • TestInputFormat类 Hadoop默认的输入方法,每条记录是一行输入。
    键是LongWritable类型,存储该行在整个文件中的字节偏移量。
    值是这行的内容,不包括任何行终止符(换行符和回车符),是Text类型的。

SequenceFileInputFormat类 顺序文件格式存储二进制的键值对的序列作为MapReduce的输入时使用。

MultipleInputs类 能妥善处理多种格式输入问题。

DBInputFormat 这种输入格式用于使用JDBC从关系数据库中读取数据。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,784评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,745评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,702评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,229评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,245评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,376评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,798评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,471评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,655评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,485评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,535评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,235评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,793评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,863评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,096评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,654评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,233评论 2 341

推荐阅读更多精彩内容