数据切片和MapTask并行度决定机制
1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
2)每一个Split切片分配一个MapTask并行实例处理
3)默认情况下,切片大小=BlockSize
4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
FileInputFormat切片源码解析
1)程序先找到你数据存储的目录
2)开始遍历处理目录下的每一个文件
3)遍历第一个文件ss.txt
a)获取文件大小fs.sizeOf(ss.txt)
b)计算切片大小
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c)默认情况下切片大小=blocksize块大小
d)开始切片,ss.txt=300M
第一个切片:ss.txt——0:128M
第二个切片:ss.txt——128:256M
第三个切片:ss.txt——256:300M
(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
e)将切片信息写到一个切片规划文件中
f)整个切片的核心过程在getSplit()方法中完成
g)InputSplit只记录了切片的元数据信息,比如起始位置,长度以及所在节点列表等
4)提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数
FileInputFormat切片大小的参数配置
1)源码中计算切片大小的公式
Math.max(minSize, Math.min(maxSize, blockSize))
mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值为Long的最大值
2)切片大小设置
maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值
minsize(切片最小值):参数调得比blockSize大,则可以让切片变得比blockSize还大
3)获取切片信息API
//获取切片的文件名称
String name=inputSplit.getPath().getName();
//根据文件类型获取切片信息
FileSplit inputSplit=(FileSplit)context.getInputSplit();
CombineTextInputFormat切片机制
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件就会产生大量的MapTask,处理效率极其低下
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个MapTask处理
虚拟存储切片最大值设置:
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
// 虚拟存储切片最大值设置20m = 20971520
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); //4m
切片机制:
FileInputFormat实现类
TextInputFormat 处理文本
KeyValueTextInputFormat 处理键值对
NLineInputFormat 按行处理
CombineTextInputFormat 处理小文件
TextInputFormat
是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
KeyValueTextInputFormat
每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");来设定分隔符。默认分隔符是tab(\t)。
NLineInputFormat
如果使用NLineInputFormat,代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整数,切片数=商+1。
自定义InputFormat
步骤:
1)自定义一个类继承FileInputFormat
2)改写RecordReader,实现一次读取一个完整文件封装为KV
3)在输出时使用SequenceFileOutPutFormat输出合并文件
MapReduce工作流程
Map阶段
1)客户端形成任务切片数
2)客户端提交切片信息(.split) 配置信息(.xml)和.jar
3)Yarn根据切片数启动相应的MapTask
4)MapTask中读取客户端提交的数据,默认使用TextInputFormat按行读取,然后在MapTask中进行逻辑运算并写出
5)MapTask写出到环形缓存区,环形缓存区对数据进行分区排序(字典顺序,快排),环形缓存区默认100M,写到80%时将环形缓存区数据溢出到文件(分区且区内有序),然后在继续回写环形缓存区
6)对溢出的文件根据分区归并排序
7)Combiner合并
案例:
Reduce阶段
1)所有MapTask任务完成后,启动相应数量的ReduceTask(根据分区数决定启动几个ReduceTask)
2)下载MapTask归并排序后的数据到本地磁盘,如果数据小则直接放到缓存中
3)合并下载的数据归并排序
4)分组排序
案例:
WritableComparable排序案例实操(区内排序)
5)将数据按key相同的原则(一次读取一组)传入Reduce方法中
6)写出到Part-r-0000000等文件
Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle
Partition分区
默认Partition分区
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks){
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
自定义Partitioner分区
需求:根据手机号前三位输出到不同文件中
1)自定义类继承Partitioner,重写getPartition()方法
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int i) {
// key 是手机号
// value 流量信息
// 获取手机号前三位
String prePhoneNum = key.toString().substring(0, 3);
int partition = 4;
if ("136".equals(prePhoneNum)) {
partition = 0;
} else if ("137".equals(prePhoneNum)) {
partition = 1;
} else if ("138".equals(prePhoneNum)) {
partition = 2;
} else if ("139".equals(prePhoneNum)) {
partition = 3;
}
return partition;
}
}
2)在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(ProvincePartitioner.class);
3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
排序
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
排序分类
1)部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序
2)全排序
最终输出结果只有一个文件,且文件每部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
3)辅助排序:(GroupingComparator分组)
在Reduce端对key进行分组。应用于:在接受key为Bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序
4)二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序
案例:
WritableComparable排序案例实操(区内排序)
Combiner合并
1)Combiner是MR程序中Mapper和Reducer之外的一种组件
2)Combiner组件的父类就是Reducer
3)Combiner和Reducer的区别在于运行的位置:
Combiner是在每一个MapTask所在的节点运行;
Reducer是接收全局所有Mapper的输出结果;
4)Combiner的意义就是对每一个 MapTask的输出进行局部汇总,以减小网络传输量
5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来
案例:
设置ReduceTask并行度(个数)
//驱动类中加入以下代码即可
//默认值是1 手动设置为4
job.setNumReduceTasks(4);
注意:
1)ReduceTask如果为0,表示没有Reduce阶段,输出文件个数和Map个数一致
2)ReduceTask默认值就是1,所以输出文件个数为一个
3)如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
4)ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有一个ReduceTask
5)具体多少个ReduceTask,需要根据集群性能而定
6)如果分区数不是1,但是ReduceTask为1,是否执行分区过程?答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行
OutputFormat接口实现类
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口
1.文本输出TextOutputFormat
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把他们转换为字符串
2.SequenceFileOutputFormat
将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩
Reduce Join
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
Reduce Join缺点及解决方案
缺点:这种方式中,合并的操作是在Reducer阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。
解决方案:Map端实现数据合并
Map Join
使用场景:Map join适用于一张表十分小,一张表很大的场景
问:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
答:在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜
计数器应用
Hadoop为每个作业维护若干个内置计数器,以描述多项指标。例如:某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。
1.计数器API
1)采用枚举方式统计计数
enum MyCounter {MALFORORMED,NORMAL}
//对枚举定义的自定义计数器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
2)采用计数器组,计数器名称的方式统计
context.getCounter("counterGroup", "counter").increment(1);
3)计数结果在程序运行后控制台上查看
数据清洗(ETL)
在运行核心业务MapReducer程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理过程往往只需要运行Mapper程序,不需要执行Reducer程序。