1 简介
MapReduce就是用map/reduce原语来在小型机集群上分治执行函数式任务的解决方案
2 编程用途
例子
word count
map(String key, String value):
//key:document name
//value:document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
分布式正则 Distributed Grep
统计URL访问频率 Count of URL Access Frequency
反转网络连接图 Reverse Web-Link Graph
Term-Vector per Host
反转索引 Inverted Index
分布式排序 Distributed Sort
3 实现
基于:
- 商用级小型机x86处理器跑linux系统每台机器2-4GB内存
- 商用级网络硬件,100MB/s
- 每个集群成百上千的普通错误率机器(不是昂贵硬件的正确机器)
- 存储在普通串口硬盘上搭建的,能在不可靠硬件上提供可靠性的GFS文件系统
- 用户提交job到调度系统
实现概览
- 输入数据被分成M份(通常是16到64MB),输出数据为R份
map worker 负责处理数据 ,把M份input file变成M份中间数据
reduce worker 负责整理数据 ,把M份中间数据变成R份output file - 特殊的master节点,选取空闲的worker进行map/reduce操作
- 被分配map操作的worker把本机中的key/value格式的输入通过用户定义的map函数计算出R份key/value格式的中间数据
- map worker处理完数据后,reduce worker被master通知R份中间数据的位置之后,读取它然后按照中间数据的key排序
- reduce worker遍历排好序的中间数据,按照用户定义的reduce方法处理数据然后放在本机
- 所有的map/reduce操作结束之后,MapReduce通过网络整理数据输出给用户
master存储的数据结构
- 所有的map/reduce worker的状态(空闲、工作中、完成)
- 所有的map/reduce worker的身份
- 每个map上的R份中间数据的位置和大小,并且在map操作完成之后更新他们。因为master起到一个导管的作用
3 容错
worker节点错误
重启worker完成任务,一段时间后启动一个其他worker完成这项任务
master节点错误
如果master错误,可以恢复到上一个检查点
如果master死亡,客户端重启整个MapReduce任务
错误出现的语义
当map/reduce操作都是函数式的任务时,MapReduce能保证无障碍输出
在map/reduce操作未完成时,结果写在本机的临时文件
map完成之后,把临时文件改名为中间文件,发送一个中间文件位置大小的消息给master
reduce完成之后,把临时文件改名为最终文件。如果多个一样的reduce任务处理了同一份数据,会产生多个同名的最终文件,我们通过原子性的重命名操作来保证最终的文件系统中只包含一份该reduce输出文件
在map/reduce操作都具有确定性时,MapReduce系统会得到和顺序执行一样的结果
存储位置
对于系统来说网络通信是比较稀缺的资源
通过把初始数据存放在map worker的具有多副本容错(通常是3个)的本机GFS上节省带宽
如果map worker错误之后会寻找距离这份GFS上数据replica近的机器完成map操作
任务粒度
M个map操作和R个reduce操作的时间复杂度是O(M+R)
master节点要做出O(M+R) 次调度,存储O(M*R) 种系统状态
此外,R因为是用户使用的输出文件所以一般会小。在实际中我们让每个map worker存放16-64MB的数据来决定M,然后让R是一个比较小的数
我们经常让MapReduce计算设置M=20w,R=5k,2k个worker
后备方案
通常让执行时间长的一个原因是有一个使用了不寻常超长的时间来完成任务的worker,比如用了一个物理损坏的硬盘让读取速度从30MB/S降低到1MB/S ,这样让时间超长的原因可能有很多:CPU、内存、磁盘、网络
解决这个问题的通用方法是,在整个MapReduce任务快要结束时,后备执行还在执行的任务,当原有节点或者后备节点执行完之后结束这个map/reduce操作,这个方法可以显著提升效率,在google的实验中是44%
4 细化
尽管map/reduce操作足够大多数场景,我们还是发现有一些效果不错的扩展
更换切分数据
通用的切分中间数据的方法是hash(key) mod R
有些情况比如输出的key是URL时,我们想让同一个主机的数据在一个输出文件中,就可以用hash(hostName(key)) mod R的方法
排序
对中间文件排序有助于让输出文件有序
合并
有时,中间文件的key会大量重复,比如你对句子做word count,很容易出现<the,1>这样的中间文件,我们可以让用户指定一个可选的合并器,在reduce取数据之前对他们进行合并
合并器有点像reduce操作,区别在于合并器向中间文件输出,reduce操作向输出文件输出
输出输出方式
从数据库或者内存中排列好的K/V数据读取输入文件很容易
文本文件的通常key是句号隔开的句子的序号
副作用
由于保证执行结果的正确,需要用户保证map/reduce写文件的原子性(比如想要向多个输出文件写入时)
跳过坏结果
有时候,一些无法解决的错误,比如第三方库的错误,无法解决,需要我们跳过
map/reduce worker在开始执行操作之前都设置了本机全局变量来捕获异常和错误,遇到错误UDP发送该操作序列号到master来跳过此任务
本地执行
为了方便debug/小型测试,MapReduce库支持本地顺序执行程序
状态信息
master节点上跑了一个HTTP服务器,可以导出各个worker的状态让用户看,包括输入文件大小、输出文件大小、中间文件大小、处理效率等
状态信息可以方便用户估计任务完成的时间并且还可以知道哪些worker坏了
计数器
当前计数器也是master节点状态信息的一部分,比如用户可能需要保证输出文件的数量,或者保证某一类文件在输出文件中的比例
5 表现
6 经验
map/reduce被用于:
- 大规模机器学习
- google广告的聚类
- 大规模网络爬虫
- 生成报告
大规模索引
使用MapReduce库让编程过程中不需要再手动处理分布式、容错、并行计算
并且可以隔离计算机底层