概要
MapReduce 是一个处理和生成大数据集的程序模型和相关实现。用户定义一个 map
函数:处理一个 key/value 对生成一组中间键值对
,和一个Reduce
函数:合并关联相同 key 的所有中间值数据。许多现实世界的任务可以用这个模型来表达,就像这篇论文里描述的那样。
写成函数式风格的程序可以自动地在大规模集群上并行处理。运行时系统只关心分割输入数据的细节,跨多个工作机器调度程序的执行,处理机器失败,以及管理需要的机器间的交互。这使得程序员不需要任何并行和分布式系统的经验也同样可以使用大规模集群的资源。
我们 MapReduce 的实现运行在一个大规模商业机器集群上具有很高的扩展:一个经典的 MapReduce 计算在成千上百台机器上处理 TB 级别的数据。开发者发现这个系统很好用:已经实现了上百个 MapReduce 程序,超过一千个 MapReduce 任务运行在 Google 的集群上。
1. 介绍
在过去的五年,作者以及在 Google 的其他同事实现了上百个特定需求的计算程序用来处理大规模的原生数据,例如爬取的文档,web 请求日志,等等。计算多种形式的派生数据,例如倒排索引,多种形式的 web 文本图结构,从每个主机爬取文档的数量,给定日期最常访问的请求等等。大多数这样的计算在概念上是直接的。可是,输入数据通常是巨大的,为了在合理时间完成计算程序不得不分布在几百或几千台机器上执行。如何并行化计算、分布数据和处理故障等问题,使用大量复杂代码来处理这些问题使得原始简单计算变得模糊起来。
作为一个对这种复杂性的应对,我们设计了一种新的抽象:允许我们使用简单的时间去执行,把并行化、分布式、容错以及负载均衡的复杂逻辑封装到一个库里。我们的抽象是由 Lisp 中的 map
和 reduce
原型以及其他函数式函数启发的。 我们意识到我们的大部分计算都包括一个 map
函数从输入里每个 逻辑
记录计算出一些中间的 键值对
,然后使用一个 reduce
函数处理相同 key
关联的所有值,为恰当地将派生数据数据合并。用户指定 map
和 reduce
函数的函数式模型使用使得大规模计算并行化变得简单,并且使用重试作为容错的默认机制。
这项工作的主要贡献在于:一个简单而又强大的接口, 它实现了大规模计算的自动并行以及分布式,结合这个接口的实现可以在大规模 PC 机器上取得巨大的性能。
第 2 部分主要介绍了基本的编程模型以及给了几个例子。第 3 部分描述了一个 MapReduce 的实现:专门为我们的以集群为基础的计算环境而定制的。第 4 部分我们介绍了一个关于程序模型我们发现的几个有用的改善点。第 5 部分介绍了针对不同任务的实现的性能指标。第 6 部分探讨了在 Google 内部 MapReduce 的使用:使用它作为基础重写生产索引系统的经验。第 7 部分讨论了相关的和未来的工作。
2 编程模型
模型:获取一组 key/value 的输入
,并输出
一组 key/value 集合。
MapReduce 库的使用者将这个计算表达成两个函数:Map 和 Reduce。
Map, 由使用者编写,获得一个 key/value 输入对,产生出一组 中间的
key/value 对。MapReduce 库按相同中间 key I 对所有中间值进行分组然后把他们发送给 Reduce 函数。
Reduce 函数,也是由使用者编写,接受一个中间 key I 和这个 key 对应的 value 集合。它把这些 value 合并成可能是更小点的 value 集合。通常每次 Reduce
调用只会返回一个或 0 个 value。这些中间 value 是通过迭代器提供给 Reduce
函数的。这允许我们处理超过内存大小的 value 集合。
2.1 例子
思考一个关于从一个巨大的文档集合里计算每一个词的出现次数。使用者可能会编写出如下的伪代码:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
map
函数生成一个单词及其出现的次数(在例子里刚好是 1
)。Reduce
函数则累加每一个单词所出现的次数。
另外,使用者使用输入输出文件的名称和一些可选的调优参数来构建 mapreduce
对象。使用者然后调用 MapReduce 函数把 mapreduce
对象传给它。这样用户代码和 MapReduce 库(由 C++ 实现)就联系在一起。附录 A 包含了这个例子的所有代码。
2.2 类型
尽管前面的伪代码是字符串类型输入输出相关的,但从概念上讲 用户提供的 map 和 reduce 函数在类型上是有关联的:
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)
I.e.,输入 key 和 value 与输出 key 和 value 来自不同的领域。此外,中间的 key 和 value 和 输出 key 和 value 属于相同域。
我们 C++ 实现在用户自定义函数之间传递字符串,让用户代码去处理字符串和相关类型的之前转换。
2.3 更多的例子
这里有些有趣的例子,他们可以很容易地用 MapReduce 函数表达。
分布式过滤:map 函数如果输入的某行和某个模式(pattern)匹配则返回这行数据。 reduce 函数是个恒等函数只是把提供的中间数据复制到输出文件中。
URL 访问的次数: map 函数处理 web 网页请求日志输出 <URL,1>
。reduce 函数累加所有具有相同 URL 的键值对的所有值,生成一个 <URL, 总数> 对。
翻转 Web 链接图:map 函数为每个从名叫 source
页面中找到的 target
生成一个 <target, source>
对。reduce 函数给定 target
URL 的所有 source
URL 拼接在一起然后生成: <target, list(source)>
。
每台主机的检索词向量:一个检索词向量 (term vector) 统计了统计了出现在一个/组文档里最重要的检索词集合:<word, frequency>
列表。map 函数输入一个文档生成 <hostname, term vector>
(hostname 从文档的 URL 中提取出来的)。reduce 接收一个给定主机(host) 所有的每个文件检索词向量,它把这些检索词向量累加,丢掉不常见的检索词,然后生成最终的 <hostname, term vector>
对。
倒排索引:map 函数解析每个文档然后生成一系列<word, documentId>
对。reduce 函数接受一个单词的所有 kv 对,然后按照相关的 documentId 排序生成一个 <word, list(document ID)>
kv对。所有的输出就变成了一个简单的倒排索引。它很容易加速计算单词的位置(文档ID)。
分布式排序:map 函数从每个单词中抽取出 key ,然后生成一个 <key, word>
kv 对。reduce 函数是个恒等函数:接受 <key, word>
集合 返回它。这个计算依赖 4.1 节提到的分割机制以及 4.2 节 提到的排序属性。
3 实现
MapReduce 的很多不同实现都是可能的,没有最好的方式只有适合的。例如某个实现可能适合小的共享内存机器,另一个适合大型 NUMA 多处理器机器,然而另外一个适合基于网络的巨型集群。
这部分介绍的实现针对 Google 里广泛使用的计算环境:通过交换机关联的巨型 PC 机集群。在我们的环境里:
(1) 机器大多是双核 x86 处理器,运行着 linux,每台机器大约有 2-4 GB 内存。
(2) 使用商业网络硬件,机器级别通常是 100 Mb/s 或者 1 G Mb/s 带宽,但平均到每台机器就很少了。
(3)存储通常就直接用 每个机器自带的廉价的 IDE 磁盘。一个内部研发的文件管理系统【8】就是用来管理存储在这类磁盘上的数据。这个管理系统使用备份机制来提供可用性和可靠性在这些可靠的硬件环境上。
(4)用户把任务提交给调度系统。多个任务组成一组任务,然后由调度系统分配给集群中可用的机器。
3.1 执行预览
Map 调用在多机器间分布式执行通过自动地将输入数据分成
M 份。数据分割可以被不同机器并行执行。Reduce 执行也是分布式的通过将中间 key 空间分割成 R 份使用分区函数(eg:hash(key) mod R)。分成多少份(R)以及分区函数由用户自己定义。
图 1 展示了我们的 MapReduce 实现的执行全流程。当用户程序调用
MapReduce
函数后,将会按照下面的执行序列进行(图中的数字标签对应下面列表中的数字):
- 用户程序中的 MapReduce 库首先将输入数据分成 M 份,每份通常是 16 MB 到 64 MB 之间(通过一个可选项让用户控制)。然后开始将用户程序复制给集群的其它机器。
- 用户程序的多个拷贝其中之一是特殊的 - master。剩下的是由 master 分配任务的 worker。由 M 个 map 任务和 R 个 reduce 任务需要分配。master 选择空闲的机器列表,然后给他们每个分配一个 map 任务或者一个 reduce 任务。
- 分配到 map 任务的 worker 读取对应的那份输入数据。它会从输入数据解析出 key/value 对集合,然后将每个 k/v 对传给用户定义的
Map
函数。中间的 key/value 对集合由 Map 函数产生并缓冲在内存中。 - 缓冲的 k/v 对定期地写入磁盘,被分区函数分成 R 份。这些缓冲的 k/v 对在磁盘的位置会被传给 master ,它会将这些位置传给运行 reduce worker机器。
- 当一个 reduce worker 通过 master 接到了这些通知,它使用远程调用从 map worker 机器上的磁盘读取缓冲的 k/v 对。当一个 reduce worker 读完了所有的中间数据,它会按照 key 对中间数据进行排序这样相同 key 的数据会出现在一起。这个排序是需要因为通常许多不同的 key 会分配给一个 reduce 任务。如果中间数据太大而超过内存限制则使用外部排序。
-
Reduce
worker 便利排序好的中间数据,对于遇到的每个中间 key,它会把这个 key 及其对应的中间值集合传给用户定义的Reduce
函数。Reduce
函数的输出添加到这个 reduce 所属分片的那个输出文件里。 - 当所有的
map
和reduce
任务完成后,master 会唤醒用户程序。此时,用户程序里的MapReduce
调用将会返回到用户代码。
成功执行之后,mapreduce 的执行结果会被放在 R 个输出文件中(每个 reduce 任务一个,文件名称由用户指定)。
通常地,用户不需要合并这个 R 份文件成一个-他们会把这些输出文件传递给另一个 MapReduce
函数,或者从另一个分布式系统中使用它们,这个分布式系统可以处理分割成多个文件的输入。
3.2 master 数据结构
master 保存了几个数据结构。对于每个 map 和 reduce 任务,它保存状态(idle(空闲) ,in-process(处理中) 或者 completed(已完成)),以及 worker 的ID (针对非空闲任务)。
master 是从 map 任务到 task 任务传递中间文件区间位置的管道。因此,对于每个完成的 map 任务,master 存储着由 map 任务提供的 R 份中间文件区间的位置和大小。当 map 任务结束的时候通过传过来的信息更新这些位置和大小。这些信息增量地推送给执行中的
reduce 任务。
3.3 容错
因为 MapReduce
库设计出来是帮助在成千上百台机器上处理海量数据的,因此这个库必须优雅地容忍机器宕机等失败。
Worker 失败(Worker Falure)
master 定期 ping 每一个worker。如果一个 worker 在指定的时间内没有响应,master 会把这个 worker 标记为失败。任何一个通过 该 worker 完成的 map 任务会把状态设置回初始的 initial
状态,这样他们就可以被调度器分布给其它 worker 了。类似地,任何进行中的 map 和 reduce 任务运行在一个失败的 worker 上都会设置回 initial
状态,然后可以被重新调度。
一个失败 worker 完成的 map 任务需要重新执行是因为它的输出是存储在失败机器的本地磁盘上——不能访问到。完成的 reduce 任务不需要重新执行是因为它的输出文件存储在一个全局的文件系统里。
当一个 map 任务首先在 worker A 上执行然后再 worker B 上执行(因为 A 挂了),所有执行 reduce 任务的 worker 会被通知到这个重新执行。任何一个还没有从 worker A 读数据的 reduce 任务将会从 worker B 读数据。
MapReduce 是能容忍大规模 worker 失败的。例如,由于网络维护导致一组 80 台机器在同一时间不能够被访问到并持续了几分钟。MapReduce master 只是简单地重新执行下这些不可达的 woker 所执行的那些任务,然后继续向前执行,最终完成这个 MapReduce 函数。
Master 挂了
可以很简单的将上述提到的 master 数据结构定期做好快照。如果 master 任务挂了,新的 master 马上启动起来,状态从最新的快照复制一份。考虑到只有一个 master , 它的失败是不受欢迎的,因此我们现在的实现会中断 MapReduce 计算执行当 master 挂了。客户端可检测到这种信号,他们可以按照自己的意愿决定是否重试。
失败存在时的语义
当用户提供的 map
和 reduce
操作是输入值的确定函数,则我们的分布式实现产生的结果和整个程序串行执行产生的结果一致。
我们依赖 map 和 reduce 任务输出的原子提交来实现这个特性。每个进行中的任务会把它的输出写到私有的临时文件中。一个 reduce 任务产生一个这样的文件,一个 map 任务产生 R 个这样的文件(一个 reduce 任务一个)。当一个 map 任务完成时,worker 会发送一个信息给master, 信息中包含 这 R 个临时文件的名字。如果 master 接收到一个已经完成的 map 任务的信息它会忽略掉的。否则它会将这 R 个文件的名称记录在一个 master 数据结构中。
当一个 reduce 任务完成时,reduce worker 将它临时输出文件名称原子地重命名成最终的输出文件。如果相同的 reduce 任务在多个机器上执行,多个重命名调用会作用到一个相同的最终输出文件。我们依赖的这个原子重命名操作由底层系统提供:保证最终的文件系统状态只包含 reduce 任务一次执行产生的数据。
我们的大部分 map
和 reduce
操作时确定性的(输入完全由输入决定,同样地输入一定由同样输出)。在这种场景中我们的语义和串行执行的是一样的,这很容易让程序员推出他们程序的行为。当 map
或 reduce
函数不是确定性的时候,我们提供了比较弱点的但仍旧合理的语义。在不确定的操作(map 和 reduce)中,一个特定的 reduce 任务 R1 的输出和整个程序的串行执行结果一致。 可是对一个不同的 reduce 任务 R2 的输出可能会和整个程序不同顺序执行的结果一致。
给定 map 任务 M 以及 reduce 任务 R1 和 R2。e(Ri) 代表 Ri 的执行。 e(R1) 可能读取 M 的一个执行产生的输出, e(R2) 可能读取 M 的另一个不同执行的输出,弱语义出现了。
3.4 本地化
网络带宽在我们的计算环境里是相对稀缺的资源。我们通过把输入数据存储在组成集群的机器磁盘上来节省带宽。GFS 把每个文件分成 多个大小为 64 MB 的 block(块),在不同机器上保存多个 block 的备份(通常为 3 份)。MapReduce master 获取输入文件的位置信息然后尝试将 map 任务调度到包含对应输入数据的机器。 如果失败了,它会尝试将 map 任务调度到靠近任务数据备份所在机器(eg: 共享相同交换机的两个机器)的机器上。当在一个集群的大部分机器上运行 大规模 MapReduce 处理,大部分的输入数据是从本地读取的不消耗任何网络。
3.4 任务粒度
如上所述,我们把 map 阶段分为 M 份,reduce 阶段分为 R 份。理想地, M 和 R 应该远大于 worker 机器的数量。这样每个机器可能执行许多不同的任务提高动态负载均衡以及当一个 worker 挂了的时候还可以提升恢复速度:在这个失败 worker 上已经完成的任务可以快速广播到别的机器。
在我们的实现里 M 和 R 有多大都是有一些客观限制,因为 master 必须做 O(M + R) 次调度以及保存 O(M * R) 个状态在内存里。
更进一步,R 的值通常由用户指定,因为每个 reduce 任务的输出是放在一个单独的输出文件里的。在实践中,我们趋向于选择一个 M 可以把输入数据分成每份大小在 16 MB 和 64 MB 之间(因此本地存储优化才能发挥最大作用),我们使 R 是我们使用 worker 节点数量的小倍数 。我们执行 MapReduce 计算时,经常使 M = 200,000 和 R =5,000,使用 2,000 台 woker 机器。
3.6 备份任务
延长一个 MapReduce 操作的耗时的常见原因之一是 “落伍者”:一个机器用不正常长的时间完成计算中剩余少量的 map 任务或者 reduce 任务。"落伍者"出现的原因有很多。例如,一个机器的磁盘坏了可能时不时地将读性能从 30 MB/s 降到 1 MB/s。集群调度系统可能已将其他的任务调度到这台机器,导致它执行 MapReduce 代码很慢因为 CPU,内存,本地磁盘以及网络带宽的竞争。我遇到的一个最近的问题: 在机器初始化代码里有个 bug 会导致处理器缓存失效:这些机器上的计算速度比正常的慢了超过一百倍。
我们有一个通用的机制来缓解落伍者的问题。当一个 MapReduce 操作接近完成时,master 启动剩余进行中任务的备份。无论主任务还是副任务完成该任务就标记为完成。我们调整了这个机制让它即使会增加资源的使用但也只会多用百分之几。我们发现这个可以很大程度上减少一个巨大的 MapReduce 操作耗时。例如,在5.3 节描述的排序程序当备份任务机制关闭的时候使用了超过 44% 的时间才完成。
4 改进(细化)Refinements
即使 map 和 reduce 函数提供的基本函数满足于大部分需求,但我们还有些有用的扩展。这一章节我将描述它。
4.1 分区函数 Partioning Function
MapReduce 的用户可以指定 reduce 任务/输出文件的数量。使用中间的键上的分区函数讲跨任务的数据分割。默认提供的分区函数使用hash (eg: hash(key))。这可以将结果拆的很均匀。但是在一些案例中使用别的分区函数很有用。例如,结果 key 是 URL,我们想把一个 host 的 URL 放到一个输出文件里。为了支持这种场景 MapReduce 库的使用者可以提供一个特定的分区函数。例如,使用 “hash(Hostname(urlkey)) mod R” 作为分区函数就可以将属于同一个 host 的所有 URL 放到相同文件里了。
4.2 顺序性保证
我们保证在一个分区里,中间的 key/value 是按 key 升序或降序处理的。这个顺序性保证使得每个分区生成一个有序输出文件变得简单,这很用:在随机查找某个 key 速度很快以及对于全部结果排序很方便。
4.3 合并函数
在一些场景中,每个 map 任务产生的 key 有很多是重复的,而且用户定义的 reduce 函数是满足交换律和结合律的。一个很好的例子是 2.1 节描述的单词计数的例子。因为单词频率趋近于 Zipf 分布(二八原则),每个 map 任务会生产出成千上百个 <the, 1> 格式的记录。这些所有的计数会通过网络发给一个 reduce 任务,然后 reduce 任务会将他们累加成一个总数。我们允许用户指定一个可选的合并函数:在发送给 reduce 任务之前做部分的合并。
合并 函数运行在每台执行 map 任务的机器上。通常 combine(合并) 函数和 reduce 函数的代码相同,combine 函数 和 reduce 函数的不同之处在于 MapReduce 库如何处理函数的输出。reduce 函数的输出会被写到一个文件,combine 函数的结果会被写到一个中间文件将会发给一个 reduce 任务。
部分合并大幅度地提高了 MapReduce 操作的速度。 附录 A 包含了一个使用合并的例子。
4.4 输入和输出类型
MapReduce 库支持了几种不同的输入数据格式。例如, "text" 形式的输入会把每行当做一个 key/value 对:key 是文件的偏移量,value 是行的内容。其他的常见支持格式都存储着按照 key 排序的 key/value 对列表。每个输入实现都知道如何将对应的输入分成有意义的范围(部分)以便给单独的 map 任务处理。用户可以提供一个 reader 接口实现来支持新的输入类型,即使大部分用户都是使用预设的输入类型中的一个。
一个 reader 不需要提供从文件里读的数据。例如,定义一个从数据库读取记录或者从内存里读取映射数据结构的 reader 很简单。
类似的是, 我们提供多种不同输出文件的格式,用户很简单添加新的类型。
4.5 副作用
在一些场景中,MapReduce 的用户发现从他们的 map 和 reduce 操作中生成一些辅助文件作为额外输出很方便。我们依赖于应用编写者让这些副作用具有原子性和幂等性。通常应用程序会写到一个临时文件里然后当全部生成时把临时文件改名。
我们没有提供一个任务生成的多个文件的原子两阶段提交。因此,产生多文件且需要跨文件一致性需求的任务应该具有确定性(串行执行和分布式执行结果一样)。在实践中这个限制不会成为一个问题。
4.6 跳过坏的记录(Skip Bad Record)
有时,用户代码中的 bug 会导致 MapReduce 操作在处理一些固定的记录时失败,这样的 bug 会阻止 MapReduce 操作完成。通常解决办法是修复这些 bug,有时这并不可行;这些 bug 可能是在第三方库里我们不可改。有时忽略一些记录也是可以接受的,比如在对一个很大的数据集做统计分析时。我们提供一个可选的执行模式当 MapReduce 操作发现某些记录会确定导致崩溃然后会跳过它以继续向前运行。
每个 woker 安装了一个信号处理器:捕获内存段错误和总线错误。在调用一个用户 Map 或者 Reduce 操作之前,MapReduce 库会将记录的编号存储在全局变量中。如果用户代码生成一个信号,信号处理器就会发送一个包含着编号的名叫 “last gasp”的 UDP 包给 master。当 master 在一个特殊的 record 发现超过一次错误时,它告诉重新执行对应 Map 或 Reduce 任务的执行跳过这个记录。
4.7 本地执行
在 Map 或 Reduce 程序里调试以找到错误是很棘手的,因为实际的计算发生于一个分布式系统上,经常在几千台机器上,通过 master 动态调度。为了帮助方便调试,分析以及小规模测试,我开发了一个 MapReduce 库的实现可以在本地机器上串行执行一个 MapReduce 操作的所有工作。控制权提供给用户以至于可以将计算限制到指定的 map 任务。用户调用他们的程序用一个特殊的 flag ,可以很容易地使用任何调试或者测试工具。
4.8 状态信息
master 内部运行着一个 HTTP 服务器,可以导出一些状态页面给用户使用。状态页面显示着计算的进度,例如多少任务已经完成了,还有多少任务在运行中,输入字节数,中间数据字节数,输出的字节数,处理速度等等。这些页面也包含标准错误输出链接以及每个任务生成的结果文件链接。用户可以使用这些数据预测计算耗时多久,是要加入更多的资源到计算中去。当计算比预料地要慢的时候这些页面也可以用来分析原因。
4.9 计数器
MapReduce 库提供了计数器用来计数不同时间的出现次数。例如,用户代码可能想统计下处理的单词总个数或者被索引德文文档的总数。
用户可以创建一个名叫 counter 的对象然后在 Map 和/或 Reduce 函数里恰当地递增计数即可。例如:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
来自单独 woker 机器的计数值定期地传播到 master 。master 聚合来自成功的 map 和 reduce 任务的计数值然后当 MapReduce 操作结束时返回给用户代码。当前的计数值同样会展示在状态页面上以至于用户可以看到正在执行计算的进度。当聚合计数值时,master 去掉相同 map 和 reduce 任务重复执行的影响。
一些计数值会被 MapReduce 库自动存储,例如输入 key/value 对的处理个数以及输出 key/value 生成的个数。
用户发现使用计数器对检查 MapReduce 操作的行为很有用。例如,在一些 MapReduce 操作中,用户代码希望保证生成的输出对(pair)的数量和已经处理的输入对(pair)数量绝对一致,或者处理的德文文档数量在处理过的总文档数量中比例在合理范围内。
5 性能
本章我们测量一下两个运行在大规模集群上的 MapReduce 计算的性能。一个计算是从大约 1 TB 数据里搜索一个指定的 pattern (grep)。另一个是给 1 TB 数据排序。
这两个程序是 MapReduce 用户写的众多真实程序里的代表- 一类程序是将数据从一种形式转换为另外一种形式,另外一类是从一个大数据集中抽取一小部分感兴趣的数据。
5.1 集群配置
程序的全部运行在一个由大约 1800 个机器组成的集群上。每台机器拥有 2 GHZ Intel Xeon 处理器(开启超线程),4 GB 内存,两块 160 GB IDE 磁盘,千兆以太网络链路。这些机器部署在一个两层树形交换网络中,在根节点上大概可以聚合 100-200 Gbps 的带宽。所有的机器都是对等部署的,因此机器两两之间的交互时间小于 1 ms。
除了 4G 内存之外,大约有 1-1.5 GB 预留来运行机器上的其他任务。测试程序运行在一个周末的下午,此时 CPU,磁盘以及网络是最空闲的时候。
5.2 Grep
grep 程序通过扫描 1010 个 100-字节大小的记录,搜索着一个相对罕见三个字符的 pattern (这个 pattern 大概存在于92,337个记录里)。输入数据被分成大约 64 MB 大小的片(M=15000),整个输出存放在一个输出文件里(R = 1)。
图2 展示了这个计算时间上的进度。Y 轴扫描输入数据的速度,随着越来越多的机器被分配到这个 MapReduce 计算这个速度逐渐上涨,当 1764 台 worker 参与进来的时候速度到达顶峰:超过 30 GB/s。当 map 任务结束时,速度开始下降然后大约 80 秒的时候降到了 0。整个计算从开始到结束大约花了 150 秒,这包含了 1 分钟的启动开销,这个开销包含程序复制到 woker 机器的开销以及与 GFS 的交互为了打开这 1000 个输入文件以及为本地优化获取一些需要的信息。
5.3 排序
排序程序给 1010 个 100-字节大小的记录(大约 1 TB)排序,这个程序是模仿 TeraSort 基准测试【10】的。
排序程序有小于 50 行的代码组成,一个三行的 Map 函数从一个文本行中抽取出一个 key,然后把这个 key 和 原生文本行作为中间的 key/value 对。我们使用内置的恒等函数作为 Reduce 操作符,这个函数将中间的 key/value 对不加修改地作为输出 key/value 对。最终的排序好的输出 2 副本的 GFS 文件里。
像之前一样输入数据被分成 64 MB 的片(M =15000),我们把排序的输出文件分成 4000 份,分片函数使用 key 的初始字节将其分成 R 片之一。
这个基准测试的分区函数内置了 key 的分布情况,在一般的排序程序中,我们会添加一个预处理 MapReduce 操作,这个操作主要采样 key 的分布情况以便计算最终排序数据的分割点。
图3(a)排序程序的正常执行进度,左边最上边的图展示了读输入数据的速率,速率顶峰达到了大约 13 GB/s 在 200 秒所有 map 任务结束时瞬间归 0。提醒一下输入速率小于 grep 程序,这是因为排序的 map 任务要花一半的时间和带宽把中间的输出写到他们的本地磁盘上。grep 的中间输出可以忽略不计。左边中间的图展示了数据从 map 任务通过网络发给 reduce 任务的速率,这个传输开始于第一个 map 任务结束。图里第一个坡峰对应着大约 1700 个 reduce 任务的第一批(整个 MapReduce 任务大约给安排了 1700 个机器,每个机器同时最多只能运行一个 reduce 任务)。计算到大概 300 秒的时候,reduce 任务的第一批中的一些结束了,我们开始为剩余的 reduce 任务传输。传输在计算进行到 600 秒的时候全部结束。左边底部的图展示了 reduce 任务将排序数据写入到输出文件里的速率。在第一次传输数据的结尾和写数据之间有个空隙,因为机器在忙着排序中间数据,保持了 2~4 GB/s 的速度一会儿。 写在计算进行到800秒的时候全部完成了。包含启动消耗,整个计算耗时 891 秒,这比较接近于 TeraSort 基准测试的报告结果:1057 秒。
还有一些事需要说明下:输入速度大于传输速度和输出速度得益于我们的本地优化-大部分数据从本地磁盘读取绕过我们相对受限的带宽。传输速度大于输出速度是因为输出阶段写两份排序数据的备份。我们写两份是因为这是底层文件系统提供的机制为了可用性和可靠性。如果底层系统使用擦除编码【14】而不是备份则写数据的网络带宽需求就会减少。
5.4 备份任务的影响
在 图 3(b),我们展示了一个备份任务关闭的排序程序的执行。执行流程和图 3 (a)展示的很类似,除了有很长的尾巴(很难有任何写活动发生)。960 秒后,除了 5 个 reduce 任务以外所有任务都完成了。可是这些少量的落后者直到 300 秒之后才完成。整个计算耗时 1283 秒,完成时间涨了 44%。
5.5 机器失败
在 图 3 (c),我们展示了排序的执行流程,在这个流程中,我们故意在执行的几分钟内杀死了 1746 个工作进程中的 200 个。底层集群调度器在这些机器上立马重新启动新的工作进程。
worker 挂掉的位置展示了一个负的读入速率这是因为一些之前的完成的 map 工作不见了需要重做,这些 map 任务的重新执行相对来说是快的。整个计算在 933 秒完成包含启动的消耗(只比正常执行耗时涨了 5%)。
6 实践
我们在 2003 年 1 月写了 MapReduce 库的第一个版本,到 2003 年 8 月我们做了很多增强,包括本地优化,跨机器任务执行的动态负载均衡,等等。从那个时候起,我们惊讶地发现 MapReduce 库可以广泛地应用于我们平常工作中遇到的问题。在 Google 它已经应用到很多地方了:
- 大规模机器学习问题,
- Google 新闻 和 Froogle 产品的集群问题
- 为了受欢迎查询报表生成而去抽取数据
- 为了新的实现和产品抽取 web 页面的属性
-
大规模图计算
图 4 展示了随着时间地推移,进入我们主要源码管理系统地 MapReduce 程序的显著增长。从 2003 年早些时候的 0 到 2004 年9月下旬的 900 个实例。MapReduce 之所以取得这样的成功是因为它实现了在半小时内写一个简单的程序并方便地运行在 1000 台机器上,极大地提升了开发效率和原型开发周期。更有甚者,它允许一个没有分布式和/或并行化系统经验的程序员很简单的使用大规模资源。
在每个 job 结束, MapReduce 库记录了关于 job 使用资源的统计情况。在表 1,我们展示了 Google 在 2004年 8 月 运行的一组 MapReduce 任务的一些统计。
6.1 大规模索引
在众多 MapReduce 库使用中值得纪念的是为 Google 网页搜索服务提供数据结构的生产索引系统的重写。索引系统的输入是我们爬虫系统爬取的巨大的文档集,存储成一组 GFS 文件。这些文档的原生内容大小超过了 20 TB。索引进程分成了大概 5-10 个 MapReduce 操作。 使用 MapReduce 有以下几个好处:
- 索引系统代码更简单、更小以及更易理解,因为处理容错、分布式以及并行化的代码封装到了 MapReduce 库。例如,这个索引系统的一个阶段 3800 行的 C++ 代码,使用 MapReduce 重写之后降到了 700 行。
- 我们将概念上不相关的计算分开 MapReduce 库的性能也足够好,而不是为了避免数据的传递将他们偶合在一起。这使我们很容易修改索引系统,例如,旧的索引系统一个修改需要花费几个月但是新的系统只需要花费几天。
- 索引进程也很好去运维了,因为大多数的问题都是由机器宕机,机器运行慢以及网络延迟引起的,这些问题已经被 MapRedcue 库自动解决了而不需要运维干预。还有一点,可以很简单地通过添加机器来提升性能。
7 相关工作
很多系统已经提供了严格的编程模型以及使用这些规则自动地并行化计算。例如,使用并行前缀计算【6,9,13】可以在 log N 时间内在 N 个处理器上完成大小为 N 数组上所有前缀相关运算 。MapReduce 可以被当成这些模型中并基于我们现实世界中大规模运算经验的一种简化和精华。更重要的是,我们提供了一种容错的且可以扩展到几千个处理器规模的实现。相对来说,大多数并行处理系统只是基于小规模运算来实现的而且把处理机器失败的细节留给了使用者。
批量同步编程【17】以及一些 MPI 原型【11】提供了高层次的抽象-使得程序员很容易编写并行程序。这些系统和 MapReduce 的一个重要不同之处在于 MapReduce 利用一个严格的编程模型使用户程序自动地并行化以及提供了透明的容错机制。我们的本地优化从活动磁盘【12,15】获得灵感,计算可以推到距离处理元素进的磁盘,这样节省了数据通过网络 I/O 传输的花费。我们运行在连接着少数几个磁盘的商业处理器上而不是直接运行在磁盘控制处理器上,不过大致的方法类似。
我们的备份任务机制】和 Charlotte 系统【3】中的 eager 调度机制类似。eager 调度机制的一个缺点是如果一个给定的任务重复失败则整个计算不能完成,我们用跳过坏的记录机制修复了这个问题的一些场景。
MapReduce 实现依赖于我们内部集群管理系统,它的主要职责是让用户任务分布式地运行在大规模共享机器上。即使不是这个论文的重点,这个集群管理系统在设计理念上类似于别的系统例如 Condor 【16】。
MapReduce 中的一个功能:排序类似于 NOW-sort【1】。源机器(map 机器)把数据分割成每份都排序好的然后发给 R 个 reduce workers 其中之一。每个 reduce woker 将它数据排好序。当然 NOW-sort 没有定义使我们的库变得如此广泛使用的用户自定义 map 和 reduce 函数。
River 【2】提供了一种编程模型:进程可以通过分布式队列交互。像 MapReduce , River 系统试图提供良好的平均性能,即使在异构硬件或系统扰动的情况下,为了达到这个目的,River 精心地调度网络和磁盘资源来平衡任务完成时间。MapReduce 有另一个方法,通过严格的编程模型, MapReduce 框架可以将问题拆分成以及多个细粒度的任务,这些任务可以被调度到空闲的 worker 上,因此速度快的 worker 进程可以处理更多的任务。严格的编程模型也允许我们在接近 job 结束调度起未完成子 task 的备份以减少整个 job 的完成时间。
8 结论
引用
[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, David E. Culler, Joseph M. Hellerstein, and David A. Patterson. High-performance sorting on networks of workstations. In Proceedings of the 1997 ACM SIGMOD International Conference on Management of Data, Tucson, Arizona, May 1997.
[2] Remzi H. Arpaci-Dusseau, Eric Anderson, Noah Treuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River: Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), pages 10–22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996.
[4] Luiz A. Barroso, Jeffrey Dean, and Urs Holzle. ¨ Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22–28, April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations. IEEE Transactions on Computers, C-38(11), November 1989.
[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78–91, Saint-Malo, France, 1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29–43, Lake George, New York, 2003.
[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par’96.Parallel Processing, Lecture Notes in Computer Science 1124, pages 401–408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page.
http://research.microsoft.com/barc/SortBenchmark/.
[11] William Gropp, Ewing Lusk, and Anthony Skjellum.
Using MPI: Portable Parallel Programming with the
Message-Passing Interface. MIT Press, Cambridge, MA,1999.
[12] L. Huston, R. Sukthankar, R. Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.
[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831–838, 1980.
[14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335–348, 1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68–74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.
[17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103–111, 1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.