2021年初的时候,关于Hadoop要退休淘汰的PR文章甚嚣尘上。其中MapReduce思想最为人所诟病,因为其并不友好的写代码方式,高昂的维护成本以及较差的运行效率。
然而MapReduce作为一种编程范式,要被淘汰恐怕也没有那么容易。很多人说你看Spark速度那么快,也很稳定啊,这不是可以淘汰掉Hadoop的MapReduce了吗?是这样吗?所谓的快和慢是根据需求来的,基于RDD的处理比MapReduce优越性也是要高昂的计算成本作为代价的。
一个头部的互联网公司每天的离线调度任务动辄数十万起,这么庞大的基于MapReduce的离线计算如果要用Spark来替代,要付出十分高昂的服务器成本。我们可以说原来用Hadoop MapReduce能做的一些事情被更好更快的其他引擎来替代了,但并不是说MapReduce被淘汰了。而且,Spark中也有Map、Reduce这样的概念哦。
【MapReduce篇】
Hadoop解决大规模数据分布式计算的方案是MapReduce。MapReduce既是一个编程模型,又是一个计算框架。也就是说,开发人员必须基于MapReduce编程模型进行编程开发,然后将程序通过MapReduce计算框架分发到Hadoop集群中运行。我们先看一下作为编程模型的MapReduce。
说说MapReduce编程模型
MapReduce是一种非常简单又非常强大的编程模型。
简单在于其编程模型只包含map和reduce两个过程,map的主要输入是一对< key , value>值,经过map计算后输出一对< key , value>值;然后将相同key合并,形成< key , value集合>;再将这个< key , value集合>输入reduce,经过计算输出零个或多个< key , value>对。
但是MapReduce同时又是非常强大的,不管是关系代数运算(SQL计算),还是矩阵运算(图计算),大数据领域几乎所有的计算需求都可以通过MapReduce编程来实现。
我们以WordCount程序为例。WordCount主要解决文本处理中的词频统计问题,就是统计文本中每一个单词出现的次数。如果只是统计一篇文章的词频,几十K到几M的数据,那么写一个程序,将数据读入内存,建一个Hash表记录每个词出现的次数就可以了,如下图。
MapReduce作业启动和运行机制
我们以Hadoop1为例,MapReduce运行过程涉及以下几类关键进程:
大数据应用进程:启动用户MapReduce程序的主入口,主要指定Map和Reduce类、输入输出文件路径等,并提交作业给Hadoop集群。
JobTracker进程:根据要处理的输入数据量启动相应数量的map和reduce进程任务,并管理整个作业生命周期的任务调度和监控。JobTracker进程在整个Hadoop集群全局唯一。
TaskTracker进程:负责启动和管理map进程以及reduce进程。因为需要每个数据块都有对应的map函数,TaskTracker进程通常和HDFS的DataNode进程启动在同一个服务器,也就是说,Hadoop集群中绝大多数服务器同时运行DataNode进程和TaskTacker进程。
具体作业启动和计算过程如下:
应用进程将用户作业jar包存储在HDFS中,将来这些jar包会分发给Hadoop集群中的服务器执行MapReduce计算。
应用程序提交job作业给JobTracker。
JobTacker根据作业调度策略创建JobInProcess树,每个作业都会有一个自己的JobInProcess树。
JobInProcess根据输入数据分片数目(通常情况就是数据块的数目)和设置的reduce数目创建相应数量的TaskInProcess。
TaskTracker进程和JobTracker进程进行定时通信。
如果TaskTracker有空闲的计算资源(空闲CPU核),JobTracker就会给他分配任务。分配任务的时候会根据TaskTracker的服务器名字匹配在同一台机器上的数据块计算任务给它,使启动的计算任务正好处理本机上的数据。
TaskRunner收到任务后根据任务类型(map还是reduce),任务参数(作业jar包路径,输入数据文件路径,要处理的数据在文件中的起始位置和偏移量,数据块多个备份的DataNode主机名等)启动相应的map或者reduce进程。
map或者reduce程序启动后,检查本地是否有要执行任务的jar包文件,如果没有,就去HDFS上下载,然后加载map或者reduce代码开始执行。
如果是map进程,从HDFS读取数据(通常要读取的数据块正好存储在本机)。如果是reduce进程,将结果数据写出到HDFS。
3. HDFS中的文件大小设置,以及有什么影响?
HDFS中的文件在物理上是分块存储(block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在hadoop2.x版本中是128M,老版本中是64M。
思考:为什么块的大小不能设置的太小,也不能设置的太大?
HDFS的块比磁盘的块大,其目的是为了最小化寻址开销。如果块设置得足够大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。
因而,传输一个由多个块组成的文件的时间取决于磁盘传输速率。
如果寻址时间约为10ms,而传输速率为100MB/s,为了使寻址时间仅占传输时间的1%,我们要将块大小设置约为100MB。默认的块大小128MB。增加文件块大小,需要增加磁盘的传输速率。
secondary namenode工作机制
1)第一阶段:NameNode启动
(1)第一次启动NameNode格式化后,创建fsimage和edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode记录操作日志,更新滚动日志。
(4)NameNode在内存中对数据进行增删改查。
2)第二阶段:Secondary NameNode工作
(1)Secondary NameNode询问NameNode是否需要checkpoint。直接带回NameNode是否检查结果。
(2)Secondary NameNode请求执行checkpoint。
(3)NameNode滚动正在写的edits日志。
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint。
(7)拷贝fsimage.chkpoint到NameNode。
(8)NameNode将fsimage.chkpoint重新命名成fsimage。
NameNode与SecondaryNameNode 的区别与联系?
1)区别
(1)NameNode负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
(2)SecondaryNameNode主要用于定期合并命名空间镜像和命名空间镜像的编辑日志。
2)联系:
(1)SecondaryNameNode中保存了一份和namenode一致的镜像文件(fsimage)和编辑日志(edits)。
(2)在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode恢复数据。
ZKFailoverController主要职责
1)健康监测:周期性的向它监控的NN发送健康探测命令,从而来确定某个NameNode是否处于健康状态,如果机器宕机,心跳失败,那么zkfc就会标记它处于一个不健康的状态。
2)会话管理:如果NN是健康的,zkfc就会在zookeeper中保持一个打开的会话,如果NameNode同时还是Active状态的,那么zkfc还会在Zookeeper中占有一个类型为短暂类型的znode,当这个NN挂掉时,这个znode将会被删除,然后备用的NN,将会得到这把锁,升级为主NN,同时标记状态为Active。
3)当宕机的NN新启动时,它会再次注册zookeper,发现已经有znode锁了,便会自动变为Standby状态,如此往复循环,保证高可靠,需要注意,目前仅仅支持最多配置2个NN。
4)master选举:如上所述,通过在zookeeper中维持一个短暂类型的znode,来实现抢占式的锁机制,从而判断那个NameNode为Active状态。
7.Hadoop 序列化和反序列化及自定义 bean 对象实现序列化
1)序列化和反序列化
(1)序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
(2)反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。
(3)Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop 自己开发了一套序列化机制(Writable),精简、高效。
2)自定义 bean 对象要想序列化传输步骤及注意事项:
(1)必须实现 Writable 接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法
(4)重写反序列化方法
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写 toString(),且用"\t"分开,方便后续用
(7)如果需要将自定义的 bean 放在 key 中传输,则还需要实现 comparable 接口,因为 mapreduce 框中的 shuffle 过程一定会对 key 进行排序
说说FileInputFormat 切片机制
job 提交流程源码详解
waitForCompletion()
submit();
// 1、建立连接
connect();
// 1)创建提交 job 的代理
new Cluster(getConfiguration());
// (1)判断是本地 yarn 还是远程
initialize(jobTrackAddr, conf);
// 2、提交 job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的 Stag 路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取 jobid,并创建 job 路径
JobID jobId = submitClient.getNewJobID();
// 3)拷贝 jar 包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向 Stag 路径写 xml 配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交 job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
在一个运行的 Hadoop 任务中,什么是 InputSplit?
FileInputFormat 源码解析(input.getSplits(job))
(1)找到你数据存储的目录。
(2)开始遍历处理(规划切片)目录下的每一个文件。
(3)遍历第一个文件 ss.txt。
a)获取文件大小 fs.sizeOf(ss.txt);。
b)计算切片大小 computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M。
c)默认情况下,切片大小=blocksize。
d)开始切,形成第 1 个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)。
e)将切片信息写到一个切片规划文件中。
f)整个切片的核心过程在 getSplit()方法中完成。
g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
h)注意:block 是 HDFS 上物理上存储的存储的数据,切片是对数据逻辑上的划分。
(4)提交切片规划文件到 yarn 上,yarn 上的 MrAppMaster 就可以根据切片规划文件计算开启 maptask 个数。
如何判定一个 job 的 map 和 reduce 的数量?
1)map 数量
splitSize=max{minSize,min{maxSize,blockSize}}
map 数量由处理的数据分成的 block 数量决定 default_num = total_size / split_size;
2)reduce 数量
reduce 的数量 job.setNumReduceTasks(x);x 为 reduce 的数量。不设置的话默认为 1。
Maptask 的个数由什么决定?
一个 job 的 map 阶段 MapTask 并行度(个数),由客户端提交 job 时的切片个数决定。
MapTask 和 ReduceTask 工作机制(也可回答 MapReduce 工作原理)
MapTask 工作机制
(1)Read 阶段:Map Task 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。
(2)Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并产生一系列新的 key/value。
(3)Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用 OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。
(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
(5)Combine 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
ReduceTask 工作机制
(1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort 阶段:按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。
(4)Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。
说说 MapReduce 有几种排序及排序发生的阶段
1)排序的分类:
(1)部分排序:
MapReduce 根据输入记录的键对数据集排序。保证输出的每个文件内部排序。
(2)全排序:
如何用 Hadoop 产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了 MapReduce 所提供的并行架构。
替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为待分析文件创建 3 个分区,在第一分区中,记录的单词首字母 a-g,第二分区记录单词首字母 h-n, 第三分区记录单词首字母 o-z。
(3)辅助排序:(GroupingComparator 分组)
Mapreduce 框架在记录到达 reducer 之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的 map 任务且这些 map 任务在不同轮次中完成时间各不相同。一般来说,大多数 MapReduce 程序会避免让 reduce 函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。
(4)二次排序:
在自定义排序过程中,如果 compareTo 中的判断条件为两个即为二次排序。
2)自定义排序 WritableComparable
bean 对象实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
3)排序发生的阶段:
(1)一个是在 map side 发生在 spill 后 partition 前。
(2)一个是在 reduce side 发生在 copy 后 reduce 前。
说说MapReduce 中 shuffle 阶段的工作流程,如何优化 shuffle 阶段
分区,排序,溢写,拷贝到对应 reduce 机器上,增加 combiner,压缩溢写的文件。
说说MapReduce 中 combiner 的作用是什么,一般使用情景,哪些情况不需要,及和 reduce 的区别?
1)Combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量。
2)Combiner 能够应用的前提是不能影响最终的业务逻辑,而且,Combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来。
3)Combiner 和 reducer 的区别在于运行的位置。
Combiner 是在每一个 maptask 所在的节点运行;
Reducer 是接收全局所有 Mapper 的输出结果。
如果没有定义 partitioner,那数据在被送达 reducer 前是如何被分区的?
如果没有自定义的 partitioning,则默认的 partition 算法,即根据每一条数据的 key 的 hashcode 值摸运算(%)reduce 的数量,得到的数字就是“分区号“。
MapReduce 怎么实现 TopN?
可以自定义 groupingcomparator,对结果进行最大值排序,然后再 reduce 输出时,控制只输出前 n 个数。就达到了 topn 输出的目的。
Hadoop 的缓存机制(Distributedcache)
分布式缓存一个最重要的应用就是在进行 join 操作的时候,如果一个表很大,另一个表很小,我们就可以将这个小表进行广播处理,即每个计算节点上都存一份,然后进行 map 端的连接操作,经过我的实验验证,这种情况下处理效率大大高于一般的 reduce 端 join,广播处理就运用到了分布式缓存的技术。
DistributedCache 将拷贝缓存的文件到 Slave 节点在任何 Job 在节点上执行之前,文件在每个 Job 中只会被拷贝一次,缓存的归档文件会被在 Slave 节点中解压缩。将本地文件复制到 HDFS 中去,接着 Client 会通过 addCacheFile() 和 addCacheArchive()方法告诉 DistributedCache 在 HDFS 中的位置。当文件存放到文地时,JobClient 同样获得 DistributedCache 来创建符号链接,其形式为文件的 URI 加 fragment 标识。当用户需要获得缓存中所有有效文件的列表时,JobConf 的方法 getLocalCacheFiles() 和 getLocalArchives()都返回一个指向本地文件路径对象数组。
如何使用 mapReduce 实现两个表的 join?
1)reduce side join : 在 map 阶段,map 函数同时读取两个文件 File1 和 File2,为了区分两种来源的 key/value 数据对,对每条数据打一个标签(tag),比如:tag=0 表示来自文件 File1,tag=2 表示来自文件 File2。
2)map side join : Map side join 是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个 map task 内存中存在一份(比如存放到 hash table 中),然后只扫描大表:对于大表中的每一条记录 key/value,在 hash table 中查找是否有相同的 key 的记录,如果有,则连接后输出即可。
什么样的计算不能用 mr 来提速?
1)数据量很小。
2)繁杂的小文件。
3)索引是更好的存取机制的时候。
4)事务处理。
5)只有一台机器的时候。
MapReduce3.0做了哪些改进?
1.Tasknative优化:为MapReduce增加了C/C++的map output collector实现(包括Spill,Sort和IFile等),通过作业级别参数调整就可切换到该实现上。对于shuffle密集型应用,其性能可提高约30%。
2.MapReduce内存参数自动推断。在Hadoop 2.0中,为MapReduce作业设置内存参数非常繁琐,涉及到两个参数:mapreduce.{map,reduce}.memory.mb和mapreduce.{map,reduce}.java.opts,一旦设置不合理,则会使得内存资源浪费严重,比如将前者设置为4096MB,但后者却是“-Xmx2g”,则剩余2g实际上无法让java heap使用到。
3.Hadoop3.x中的MapReduce添加了Map输出collector的本地实现,对于shuffle密集型的作业来说,这将会有30%以上的性能提升。
【HDFS篇】
HDFS 中的 block 默认保存几份?
默认保存3份
HDFS 默认 BlockSize 是多大?
从2.7.3版本开始,官方关于Data Blocks 的说明中,block size由64 MB变成了128 MB的。
负责HDFS数据存储的是哪一部分?
DataNode负责数据存储
HDFS有哪些优点
1、高容错性
数据自动保存多个副本
副本丢失后,自动恢复
2、适合批处理
移动计算而非数据
数据位置暴露给计算框架
3、适合大数据处理
GB、TB、甚至PB级数据
百万规模以上的文件数量
10K+节点规模
4、流式文件访问
一次性写入,多次读取
保证数据一致性
5、可构建在廉价机器上
通过多副本提高可靠性
提供了容错和恢复机制
HDFS缺点
1、低延迟数据访问
比如毫秒级-达不到
低延迟与高吞吐率
2、小文件存取
占用NameNode大量内存
寻道时间超过读取时间
3、并发写入、文件随机修改
一个文件只能有一个写者
仅支持append
HDFS访问方式有哪些
HDFS Shell命令
HDFS Java API
HDFS REST API
HDFS Fuse:实现了fuse协议
HDFS lib hdfs:C/C++访问接口
HDFS 其他语言编程API
使用thrift实现
支持C++、Python、php、C#等语言
如何增加和移除DataNode节点
1、加入新的datanode
步骤1:将已存在datanode上的安装包(包括配置文件等)拷贝到新datanode上
步骤2:启动新datanode
sbin/hadoop-deamon.sh start datanode
2、移除旧datanode
步骤1:将datanode加入黑名单,并更新黑名单,在NameNode上,将datanode的host或者ip加入配置选项dfs.hosts.exclude指定的文件中
步骤2:移除datanode
bin/hdfs dfsadmin -refreshNodes
HDFS快照的作用以及如何设置
1、HDFS上文件和目录是不断变化的,快照可以帮助用户保存某个时刻的数据
2、HDFS快照的作用
防止用户误操作删除数据
数据备份
3、一个目录可以产生快照,当且仅当它是Snapshottable
bin/hdfs dfsadmin allowSnapshot
4、创建/删除快照
bin/hdfs dfs -createSnapshot[]
bin/hdfs dfs -deleteSnapshot[]
5、快照存放位置和特点
快照是只读的,不可修改
快照位置:
/.snapshot
/.snapshot/snap_name
谈谈HDFS的缓存
1、HDFS自身不提供数据缓存功能,而是使用OS缓存
容易内存浪费,eg.一个block三个副本同时被缓存
多种计算框架共存,均将HDFS作为共享存储系统
MapReduce:离线计算,充分利用磁盘
Impala:低延迟计算,充分利用内存
Spark:内存计算框架
2、HDFS应让多种混合计算类型共存一个集群中
合理的使用内存、磁盘等资源
比如,高频访问的特点文件应被尽可能长期缓存,防止置换到磁盘上
3、用户需通过命令显式的将一个目录或文件加入/移除缓存
不支持块级别的缓存
不支持自动化缓存
可设置缓存失效时间
4、缓存目录:仅对一级文件进行缓存
不会递归缓存所有文件与目录
5、以pool的形式组织缓存资源
借助YARN的资源管理方式,将缓存划分到不同pool中
每个pool有类linux权限管理机制、缓存上限、失效时间等
6、独立管理内存,未与资源管理系统YARN集成
用户可为每个DN设置缓存大小,该值独立于YARN
HDFS的存储机制
HDFS存储机制,包括HDFS的写入数据过程和读取数据过程两部分
HDFS写数据过程
1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。
2)NameNode返回是否可以上传。
3)客户端请求第一个 block上传到哪几个datanode服务器上。
4)NameNode返回3个datanode节点,分别为dn1、dn2、dn3。
5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
6)dn1、dn2、dn3逐级应答客户端。
7)客户端开始往dn1上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,dn1收到一个packet就会传给dn2,dn2传给dn3;
dn1每传一个packet会放入一个应答队列等待应答。
8)当一个block传输完成之后,客户端再次请求NameNode上传第二个block的服务器。(重复执行3-7步)。
HDFS读数据过程
1)客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。
3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以packet为单位来做校验)。
4)客户端以packet为单位接收,先在本地缓存,然后写入目标文件。
MapReduce性能优化
Map端优化
通过单个map的写入文件大小和任务处理时间得出
发生大量溢写时会产生性能问题和读取过载,比较Map output records < Spilled Records
需要精确分配内存缓冲区
二进制文件和压缩文件本质上不基于块,因此不能拆分
小文件会产生大量并行任务来处理,会浪费很多资源
处理小文件的最好方法是打包为大文件
使用Avro对数据序列化来创建容器文件
使用HAR格式文件
使用序列文件把小文件存储成单个大文件
如果数据集很大但数据块很小会导致mapper过多,需要花时间进行拆分;因此输入文件大则数据块大小也要加大
大的数据块会加速磁盘IO,但会增加网络传输开销,因而在Map阶段造成记录溢写
Map任务的流程
输入数据和块大小的影响
处置小文件和不可拆分文件
在Map阶段压缩溢写记录
计算Map任务的吞吐量
Reduce端优化
压缩排序和合并的数据量(combiner,数据压缩,数据过滤)
解决本地磁盘问题和网络问题
最大化内存分配以尽可能把数据保留在内存而不是输出到磁盘
造成Reduce低速的原因可能是未经优化的reduce函数,硬件问题或者不当的Hadoop配置
通过输入Shuffle除以Reduce运行时间得到吞吐量
Reduce任务的流程
计算Reduce吞吐量
改善Reduce执行阶段
MapReduce任务级别优化
(1)使用Combiner
类似于本地Reduce操作,可以提升全局Reduce操作效率
习惯上一般直接把reduce函数当做Combiner,逻辑需满足交换律和结合律
Combiner会在Map函数生成的键值对收集到列表,并经过Combiner运算直到Combiner缓冲区达到一定数目时,才会发送给reduce。因此在数据量非常大的情况下可以很好的改善性能
(2)使用压缩技术
输入压缩:在有大量数据且计划重复处理时,应考虑输入压缩。Hadoop会自动对合适扩展名的文件启用压缩和解压
压缩Mapper输出:当map任务中间数据量大时,应考虑在此阶段启用压缩。能改善Shuffle过程,降低网络开销
压缩Reducer输出:可以减少要存储的结果数据量,同时降低下游任务的输入数据量
如果磁盘IO和网络影响了MR作业性能,则在任意阶段(压缩输入,Mapper或Reduce输出)启用压缩都可以改善处理时间,减小IO和网络开销
(3)使用正确的Writable类型
通过使用FileInputFormat实现原始字节比WriteableComparable更有优势
使用Text而不是String来消除字符串拆分所花的时间
使用VIntWritable或者VLongWritable有时比使用int和long更快
在代码中使用正确的可写类型能提高MR作业整体性能
在Shuffle和Sort阶段,中间键的比较可能会成为瓶颈