MapReduce: Simplified Data Processing on Large Clusters
Abstract
MapReduce 是一种编程模型和相应的实现,用于处理和生成大数据集。用户指定一个 map 函数,该函数处理键/值对以生成一组中间键/值对,并指定一个 reduce 函数,该函数合并与同一中间键关联的所有中间值。正如论文所示,许多实际任务都可以用这个模型来表达。以这种函数式风格编写的程序会自动并行化,并在大规模商品机器集群上执行。
运行时系统负责处理输入数据的分区细节、跨机器集调度程序执行、处理机器故障以及管理所需的跨机器通信等问题。这使得没有并行和分布式系统经验的程序员也能轻松利用大型分布式系统的资源。
我们MapReduce的实现在大规模商业机器集群上运行,并且具有高度可扩展性:典型的MapReduce计算可以在数千台机器上处理多达数十亿字节(TB)级别的数据。程序员发现该系统易于使用:已经实现了数百个 MapReduce 程序,并且每天都有超过一千个 MapReduce 作业在 Google 的集群上执行。
1. Introduction
在过去的五年中,作者和Google的许多其他人已经实现了数百种专用计算,这些计算处理大量原始数据,如爬取的文档、网页请求日志等,以计算各种派生数据,如倒排索引、网页文档的图结构表示、每个主机爬取的页面数量汇总、给定天内最常见查询集等。大多数此类计算在概念上都很直接。然而,输入数据通常很大,并且必须将计算分布在数百台或数千台机器上才能在合理时间内完成。如何并行化计算、分布数据和处理故障等问题使得原本简单的计算被大量复杂代码所掩盖。
为了应对这种复杂性,我们设计了一种新的抽象方法,它可以让我们表达我们试图执行的简单计算,并将并行化、容错性、数据分发和负载平衡等混乱细节隐藏在库中。我们的抽象灵感来自于Lisp和许多其他函数化语言中存在的map(映射)和reduce(归约)原语。
我们意识到,我们大部分的计算涉及到对输入中每一个逻辑record应用map操作以便于计算一组中间键/值对,然后对所有具有相同中间键值的元素应用reduce操作以适当地合并派生出来的数据。 我们使用用户指定的map和reduce操作的函数模型使得我们可以轻易地并行化大规模运算,并使用重新执行作为容错性主要机制。
这项工作的主要贡献是一个简单而强大的接口,该接口能够自动并行化和分配大规模计算,并结合了这个接口的实现,可以在大型商品PC集群上实现高性能。
第2部分描述了基本编程模型并给出了几个示例。第3部分描述了针对我们基于集群的计算环境定制的MapReduce接口实现。第4部分描述了我们发现的编程模型的几种改进,并且这几种改进已经被我们发现是有用的。第5部分有关于各种任务的,以我们的实现为基础的性能的测量。第6部分探讨了Google内部使用的MapReduce,包括我们将其作为基础经验,以重写生产索引系统。第7部分讨论相关和未来工作。
希望这段翻译可以帮助你理解原文内容,并且提供一些扩展知识以供参考:MapReduce 的设计目标之一就是将数据处理任务划分到多台机器上进行处理,以便在面对海量数据时仍然可以得到快速响应。通过自动化地解决资源管理、容错、任务调度等问题,MapReduce 使得开发者能够专注于他们真正想要解决的问题,而不需要过多关注底层细节。
2. Programming Model
这种计算接收一组输入的键/值对,并产生一组输出的键/值对。MapReduce库的用户通过两个函数来表达这种计算:Map和Reduce。
由用户编写的Map函数,接受一个输入键值对,并产生一组中间键/值对。MapReduce库将所有与同一个中间键I关联的中间值分组在一起,并传递给Reduce函数。
由用户编写的Reduce函数,接受一个中间键I和该键的一组值。它将这些值合并在一起,形成可能更小的一组值。通常每次调用Reduce都会产生零个或一个输出值。中间值通过迭代器提供给用户的reduce函数。这使我们能够处理过大以至于无法装入内存的值列表。
2.1 Example
考虑在大量文档中统计每个单词出现次数的问题。用户将编写类似于以下伪代码的代码:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
map函数发出每个单词以及关联的出现次数(在这个简单示例中,只是’1’)。reduce函数将针对特定单词发出的所有计数相加。
此外,用户编写代码来填充一个mapreduce specification对象,其中包含输入和输出文件的名称,以及可选的调优参数。然后用户调用MapReduce函数,并把函数传递给mapreduce specification对象。用户的代码与MapReduce库(用C++实现)链接在一起。附录A包含了这个示例的完整程序文本。
2.2 Types
尽管前面的伪代码是以字符串输入和输出来编写的,但从概念上讲,用户提供的map和reduce函数有相关联的类型:
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)
也就是说,输入键和值来自于一个不同于输出键和值的域。此外,中间键和值与输出键和值来自同一域。我们的C++实现将字符串传递给用户定义的函数,并让用户代码负责在字符串和适当类型之间进行转换。
2.3 More Examples
以下是一些可以轻松表达为MapReduce计算的有趣程序的简单示例。
Distributed Grep:map函数如果发现与提供的模式相匹配的内容,就会发出一行。reduce函数是一个恒等函数,只是将提供的中间数据复制到输出。
Count of URL Access Frequency:map函数处理网页请求日志,并输出〈URL, 1〉。reduce函数将相同URL的所有值加在一起,并发出一个〈URL, total count〉对。
Reverse Web-Link Graph:map函数为在名为source的页面中找到的每个targetURL的链接,之后输出< target, source >对。reduce函数连接与给定target URL关联的所有source URL列表,并发出这样一对值:〈target, list(source)〉
Term-Vector per Host:词向量汇总了在一个文档或一组文档中出现的最重要的单词,以一系列的〈word, frequency〉对表示。map函数为每个输入文档发出一个〈hostname, term vector〉对(其中主机名从文档的URL中提取)。reduce函数传递给定主机的所有的每个文档的词项向量。它将这些词项向量加在一起,丢弃频率低的术语,然后发出最终的〈hostname, term vector〉对。
Inverted Index:map函数解析每个文档,并发出一系列〈word, document ID〉对。reduce函数接受给定单词的所有对,排序相应的文档ID并发出一个 〈word, list(document ID)〉 对。所有输出对形成了一个简单倒排索引。很容易增加这种计算以跟踪单词位置。
Distributed Sort:map函数从每条记录中提取键,并发出一个 〈key, record〉 对。reduce函数不改变地发出所有对。这种计算依赖于4.1节描述的分区设施和4.2节描述的排序属性。
2.3.1 Distributed Grep(Lab1 bonus)
分布式Grep是一种在大规模数据集上执行文本搜索的技术。
在这个MapReduce示例中,”map”函数负责处理每一行输入数据(比如一个文件的内容)。如果某一行与给定的模式匹配,”map”函数就会将这一行输出。在这里,“分布式”意味着你可以同时对多个文件或数据块进行处理。
而“reduce”函数在此示例中实际上并没有做什么实质性的工作,它只是简单地将所有来自”map”函数的输出复制到最终结果中。也就是说,它起到了恒等(identity)函数的作用。
因此,“分布式Grep”的主要工作都在”map”阶段完成。通过这种方式,我们可以利用MapReduce框架并行地处理大量数据,并快速找出与特定模式匹配的文本行。
3. Implementation
许多不同的MapReduce接口实现方式是可能的。正确的选择取决于环境。例如,一个实现可能适用于小型共享内存机器,另一个适用于大型NUMA多处理器,还有一个适用于更大规模的网络机器集合。本节描述了一种针对谷歌广泛使用的计算环境而设计的实现:由廉价个人电脑组成的大规模集群,通过交换式以太网连接在一起[4]。在我们的环境中:
-
通常使用双处理器x86架构运行Linux操作系统的机器,每台机器配备2-4GB内存。
-
使用商用网络硬件 - 通常是每台机器100兆比特/秒或1千兆比特/秒,但实际上整体二分带宽要低得多。
“但整体二分带宽要低得多”的意思是,在实际操作中,由于网络中的各种因素(例如网络拥堵、硬件限制等),每台机器实际可用的网络带宽(平均值)要比理论最大值(例如100兆比特/秒或1千兆比特/秒)低很多。
这里的”二分带宽”是一个专门的网络术语,它指的是在一次操作中可以同时发送和接收数据的最大能力。在考虑一个集群或者网络系统性能时,我们通常关注这个参数,因为它决定了系统在处理大规模数据时候的效率。
-
集群由数百台或数千台机器组成,因此机器故障很常见。
-
存储由直接连接到各个机器上的廉价IDE磁盘提供。我们使用自家开发的分布式文件系统[8]来管理存储在这些磁盘上的数据。该文件系统利用复制来提供可靠性和可用性,并弥补了不可靠硬件带来的问题。
-
用户将作业提交给调度系统。每个作业由一组任务组成,并且调度程序将其映射到集群中可用的一组机器上。
这种实现方式考虑了谷歌所面临的具体计算环境特点,并采取相应策略以优化性能和可靠性。
3.1 Execution Overview

Map这个函数调用通过自动将输入数据划分为M个片段(splits)从而在多台机器上分布。这些输入切片可以由不同的机器并行处理。Reduce这个函数调用通过使用分区函数(例如,hash(key) mod R)将中间键空间划分为R个部分来进行分布。分区数(R)和分区函数由用户指定。
用户程序是用户根据实际需求自己编写的map和reduce函数。
注意Map读取的Input分片与中间结果的分片方式没有关系,二者不是一个内容。
图1显示了我们实现的MapReduce操作的整体流程。当用户程序调用MapReduce函数时,会发生以下一系列操作(图1中的编号标签对应于下面列表中的数字):
- 用户程序中的MapReduce库首先将输入文件切分成M个部分,每部分通常为16兆字节到64兆字节(MB)(用户可以通过一个可选参数进行控制)。然后它在机器集群上启动多个程序副本。
- 其中一个程序副本是特殊的 - 主节点。其余的是工作节点,由主节点分配工作。有M个map任务和R个reduce任务需要分配。主节点选择空闲的工作节点,并为每一个分配一个map任务或reduce任务。
- 被指派map任务的工作节点读取相应输入切片的内容。它从输入数据中解析出键/值对,并将每一对传递给用户定义的Map函数。由Map函数产生的中间键/值对被缓存在内存中。
- 周期性地,缓存的中间键值对被写入本地磁盘,通过分区函数划分成R个区域。这些缓存的中间键值对在本地磁盘上的位置被反馈给主节点,主节点负责将这些位置转发给reduce工作节点。
- 当reduce工作节点收到主节点关于这些位置信息的提醒时,它使用远程程序调用(RPC)来从map工作节点所在机器上读取缓存数据。当reduce worker读取了所有中间数据后,它按照中间键进行排序以使得所有的相同中间键的键值对出现在一起。排序是必需操作,因为通常许多不同键映射到同一reduce任务上面去了。如果中间数据的量特别大,且大到无法一次性装入内存,则使用外部排序方式处理。
- reduce worker遍历已经排序好的中间键值对,对于遇到的每个唯一的中间键,它传递该中间键和对应的中间值集合到用户的Reduce 函数进行处理。Reduce 函数的输出被附加到这个 reduce函数分区的最终输出文件里面去。
- 当所有 map 任务和 reduce 任务已经完成,Master程序就会唤醒用户程序。此时,用户程序中的MapReduce调用返回到用户代码。、
成功完成后,mapreduce执行的输出在R个输出文件中可用(每个reduce任务一个,文件名由用户指定)。通常,用户不需要将这R个输出文件合并成一个文件 - 他们经常将这些文件作为另一个MapReduce调用的输入,或者将它们用于其他分布式应用程序,该分布式应用程序需能够处理划分到多个文件中的输入。
换句话说,在MapReduce模型中,我们并不总是需要把所有reduce任务的结果合并到一个单一的输出文件。因为在许多情况下,下一步处理(可能是另一个MapReduce计算或其他类型的分布式计算)可以直接使用这些已经被划分好的结果。这样可以避免合并文件所需的额外计算和I/O操作,并且也更符合”数据局部性”原则,即尽可能地让数据处理操作在数据所在位置进行。
3.2 Master Data Structures
主节点维护着几个数据结构。对于每个map任务和reduce任务,它存储状态((idle, in-progress, or completed),以及工作机器的身份(for non-idle tasks)。
主节点是将map任务中产生的中间文件区域的位置传播到reduce任务的管道(媒介)。因此,对于每个已完成的map任务,主节点存储由map任务产生的R个中间文件区域的位置和大小。随着map任务完成,这些位置和大小信息会被更新。这些信息会逐步推送给有进行中reduce任务的工作节点。
简单来说,主节点在MapReduce计算过程中扮演了协调者和调度者的角色。它需要跟踪所有正在执行或等待执行的Map和Reduce操作,并负责在各工作节点之间分配和调度这些操作。同时,它还需要保存关于数据位置、状态等重要信息,并在合适时机将这些信息传递给相应的工作节点以便他们能正确地找到并处理自己所需处理的数据。
3.3 Fault Tolerance
由于MapReduce库是设计用来处理大量数据的,使用了数百或数千台机器,因此该库必须能够优雅地容忍机器故障。
3.3.1 Worker Failure
主节点会定期向每个工作节点发送ping消息。如果在一定时间内没有收到工作节点的响应,主节点会将该工作节点标记为失败。由该工作节点完成的任何map任务都会被重置回初始的空闲状态,因此可以在其他工作节点上重新调度。同样,任何在失败的工作节点上进行中的map任务或reduce任务也会被重置为空闲状态,并有资格重新调度。
由于已完成的map任务的输出存储在失败机器的本地磁盘上,因此无法访问,所以需要在故障后重新执行。已完成的reduce任务不需要重新执行,因为它们的输出存储在全局文件系统中。
当一个map任务首先由worker A执行,然后稍后由worker B执行(因为A失败),所有正在执行reduce任务的worker都会收到这个map任务的重新执行通知,以确保所有reduce worker知道这个map任务现在由B在执行,因为A故障了或者失败了。任何还没有从worker A读取数据的reduce任务将从worker B读取数据。
MapReduce能够抵御大规模worker故障。例如,在一次MapReduce操作中,在正在运行集群上进行网络维护导致一次性有80台机器无法访问几分钟。MapReduce的Master程序只是重新执行了无法访问的工作机器所做过得事情,并且持续向前推进, 最终完成 MapReduce 操作.
这种设计使得MapReduce能够处理大规模并行计算环境中常见问题如硬件故障或网络问题等,并确保整个计算过程可以正常进行和最终完成。
3.3.2 Master Failure
让主节点定期对上述主数据结构进行检查点写入是容易的。如果主任务死掉,可以从最后一个检查点状态启动一个新副本。然而,由于只有一个主节点,它的失败是不太可能的;因此我们当前的实现在主节点失败时会中止MapReduce计算。如果客户端希望,可以检查这种情况并重试MapReduce操作。
尽管单个master节点的设计简化了系统架构和调度策略,并且在大多数情况下都能工作得很好,但这也意味着master节点成为了整个系统中唯一的单点故障源(Single Point of Failure)。如果master出现问题或者崩溃,则整个MapReduce任务可能就无法继续执行。
为了解决这个问题,一种可行方案是使用故障恢复技术,比如定期保存checkpoint(检查点)。通过保存master节点当前的状态信息到硬盘或其他持久化存储设备,在master出现问题时可以从最近的checkpoint恢复其状态,并尽可能地继续执行未完成的任务。
然而,在当前实现中选择了另一种更简单但也更粗糙的策略:当master出现故障时直接中止整个MapReduce计算,并让用户自己决定是否需要重新启动任务。
3.3.3 Semantics in the Presence of Failures(在存在故障情况下的语义)
当用户提供的map和reduce操作符是其输入值的确定性函数时,我们的分布式实现产生的输出与非故障顺序执行整个程序所产生输出是一致的。我们依赖于map和reduce任务输出的原子提交来实现这一属性。
每个进行中的任务将其输出写入私有临时文件。一个reduce任务生成一个这样的文件,一个map任务生成R个这样的文件(每个reduce任务一个)。当map任务完成时,工作节点向主节点发送消息,并在消息中包含R个临时文件名。如果主节点接收到已经完成的map任务的完成消息,它会忽略该消息。否则,它会在主数据结构中记录R个文件名。
当reduce任务完成时,reduce工作节点将其临时输出文件原子性地重命名为最终输出文件。如果同一reduce任务在多台机器上执行,则对同一最终输出文件执行多次重命名调用。我们依赖底层文件系统提供的原子重命名操作来保证最终文件系统状态只包含由一次reduce任务执行产生的数据。
简单来说,在存在失败情况下,MapReduce通过使用原子性操作和特定策略确保了计算结果与单机、无故障环境下得出结果相同。具体来说,每次Map或Reduce操作都把结果写入私有临时文件,并只有在操作成功完成后才将这些结果提交(即变为可见)。因此即使出现了失败或者重复执行等问题也不会影响到最终结果。
我们的绝大多数map和reduce操作符都是确定性的,而且在这种情况下,我们的语义等同于顺序执行,这使得程序员非常容易理解他们程序的行为。当map和/或reduce操作符是非确定性的时候,我们提供了较弱但仍然合理的语义。
在存在非确定性操作符时,特定reduce任务R1的输出等价于由非确定性程序顺序执行产生的R1输出。然而,不同reduce任务R2的输出可能对应于由非确定性程序以不同的顺序执行而产生的R2输出。
“然而,不同的reduce任务R2的输出可能对应于由非确定性程序的不同顺序执行产生的R2输出。”
这句话是说,在存在非确定性操作符时,MapReduce提供一种“分离”的语义:每个Reduce操作看到并处理数据是基于它自己观察到Map阶段结果(可能包括失败和重试)所形成独立视角上进行计算。
让我们用一个例子来详细解释一下。假设我们有一个map任务M和两个reduce任务R1和R2。M是非确定性的,即它可以为相同输入生成不同输出。现在,假设M运行了两次:第一次运行生成了结果A,第二次运行生成了结果B。
在这种情况下,如果R1读取并处理了结果A,并且在处理完成后提交了它自己的最终结果;同时,由于某种原因(比如故障或超时),M需要重新执行,并在第二次执行中生成了结果B。如果此时R2读取并处理了这个新产生的结果B,并提交了它自己的最终结果。
那么你会发现,在这个例子中,虽然R1和R2都是针对相同map任务M的reduce操作,但是他们看到并处理的map输出却不一样(一个是A, 另一个是B)。也就导致他们各自提交给全局存储系统中数据可能会有所不同。
因此,“然而,不同reduce任务 R2 的输出可能对应于由非确定性程序不同顺序执行产生的 R2 输出” 这句话就意味着,在存在非确定性操作符时候(即某些步骤具有随机性),MapReduce无法保证全局一致性(即所有Reduce操作都基于相同、单一版本Map阶段得出结论)。但至少可以保证每个Reduce操作本身是正确且可复现。
考虑map任务M和reduce任务R1和R2。设e(Ri)为提交了Ri 的执行(每个任务只有一个这样的执行)。较弱语义出现是因为e(R1)可能已经读取了M一次执行产生的输出,并且e(R2)可能已经读取了M另一次执行产生的输出。
换句话说,在存在非确定性操作符时,MapReduce提供一种“分离”的语义:每个Reduce操作看到并处理数据是基于它自己观察到Map阶段结果(可能包括失败和重试)所形成独立视角上进行计算。虽然这种方式无法保证全局一致性(即所有Reduce操作都基于相同、单一版本Map结果),但它至少可以保证每个Reduce操作本身是正确并可复现(reproducible)。
3.3.4 When same reduce task is executed on multi-machines?
同一reduce任务在多台机器上执行可能出现在以下几种情况:
工作节点故障: 如果执行reduce任务的工作节点失败,系统会将该任务重新调度到其他可用的工作节点上执行。
超时: 在一些实现中,如果一个reduce任务在预定时间内没有完成,主节点可能会将该任务标记为超时,并将其重新分配给其他工作节点。
慢速运行: 当一个或多个特定的reduce任务比其他所有操作都慢(被称为“straggler”)时,主节点可能会启动这些慢速任务的额外副本。这是一种称为“备份执行”(backup execution)或“乱斗”(speculative execution)的技术,旨在通过在多台机器上并行处理相同的任务来加快整体进度。然后,首先完成此项任务的结果被接受并用于最终输出。
当启动备份执行(backup execution)时,每台执行相同reduce任务的机器都会在执行完成之前将结果保存在自己的本地临时文件中。然后,在执行完成后,它们会将结果放入全局共享存储。
这种策略有助于防止部分完成的工作被其他工作节点看到或使用,从而避免了数据不一致和错误。只有当任务成功完成后,其结果才会被提交并变为可见。如果同一个任务在多台机器上同时运行,并且有多个版本都成功完成,则通常只接受第一个完成的版本,并忽略其他版本。
请注意,在具体实现中可能存在一些差异和优化。例如,在Hadoop这样的MapReduce实现中,reduce阶段输出直接写入到HDFS(Hadoop Distributed File System)中,而不是首先写入本地临时文件。
此外,在MapReduce模型中,全局存储系统最终会保存首个完成的reduce任务的结果。如果有多个副本同时运行同一个reduce任务(例如,由于备份执行或故障恢复),则只有首个完成并成功提交结果的副本被接受。其它后续完成的副本产生的结果将被忽略。
这种策略确保了数据一致性和正确性,因为它避免了由于非确定性或并发执行引起的潜在冲突和不一致。然而,这也意味着如果一个reduce任务在多台机器上运行,并且每次运行都产生不同的结果(例如,由于非确定性操作符),那么全局存储系统最终保存哪个版本是无法预知的,因为这取决于哪个版本首先完成并成功提交。
请注意,在具体实现中可能存在一些差异和优化。例如,在Hadoop这样的MapReduce实现中,可能会使用更复杂、更健壮的方法来处理错误恢复、备份执行和数据一致性等问题。
3.4 Locality
网络带宽在我们的计算环境中是相对稀缺的资源。我们通过利用输入数据(由GFS[8]管理)存储在组成我们集群的机器的本地磁盘上这一事实来节省网络带宽。GFS将每个文件分割成64 MB的块,并在不同的机器上存储每个块的几份副本(通常是3份),这种设计旨在提高数据可靠性和可用性。即使某些机器出现故障或数据丢失,也可以从其他副本中恢复。
MapReduce主节点考虑到输入文件的位置信息,尝试在包含相应输入数据副本的机器上调度一个映射任务。如果失败,它会尝试在靠近该任务输入数据副本的地方调度一个map任务(例如,在与包含数据的机器位于同一网络交换机上的工作机器上)。
因此,在运行大规模MapReduce操作时——比如说涉及到集群内大部分工作节点时——大部分输入数据都可以通过本地读取获得,而无需通过网络传输获取。这样不仅可以显著降低对网络带宽的需求,还能提高整体计算效率。
3.5 Task Granularity
我们将map阶段划分为M个部分,将reduce阶段划分为R个部分,如上所述。理想情况下,M和R应该远大于工作机器的数量。让每个工作节点执行许多不同的任务可以提高动态负载平衡,并且当一个工作节点失败时也能加速恢复:它已完成的许多map任务可以在其他所有工作机器之间分配。
在我们的实现中,M和R有多大是有实际限制的,因为主节点必须做出O(M + R)个调度决策,并且如上所述,在内存中保持O(M * R)状态。(然而内存使用的常数因子很小:O(M * R)状态占用的数据是:要维护M*R个任务对的元数据信息,每对map任务/reduce任务占用的内存大概是一字节。)
此外,用户经常限制R,因为每个reduce任务的输出都会结束在一个单独的输出文件中。实际上,我们倾向于选择M使得每个单独的任务大约是16 MB到64 MB 的输入数据(这样上面描述的局部性优化最有效),并且我们使R成为我们预期使用的工作机器数量的小倍数。我们通常使用2,000台工作机进行MapReduce计算,其中 M = 200,000 和 R = 5,000。
这种设计旨在提高系统性能和可靠性。通过增加更多Map和Redice任务数(即增加M和R),可以更好地平衡负载并从故障中恢复。然而,在处理大规模数据时需要注意资源管理问题——例如主节点需要做出更多调度决策,并需要维护更大量状态信息。
另外值得注意是MapReduce操作通常被设计成与硬件资源紧密相关联——尤其是网络带宽和存储容量——以便最大程度地利用本地化优化策略,并兼顾计算效率与资源利用率之间找到合适平衡点。
3.5.1More Details about O(M*R)
在一个MapReduce任务中,”map”阶段被划分为’M’个部分,而”reduce”阶段被划分为’R’个部分。这意味着有M个map任务和R个reduce任务需要执行。决定有多少map任务(M)和reduce任务(R)是在任务分配的粒度和管理这些任务的开销之间找平衡。
MapReduce作业中的主节点有两个主要职责:
- 调度:主节点需要将每一个map任务和每一个reduce任务调度到集群中的工作节点。这个调度过程涉及到关于每一个任务应该在哪里(哪一个工作节点上)运行的决策,同时要考虑诸如数据本地性、工作节点之间的负载平衡等因素。由于有M个map任务和R个reduce任务,所以主节点需要做出O(M + R)次调度决策。
- 保持状态:主节点还会跟踪所有这些M*R对map/reduce 任务对的状态或进展 - 例如,一个任务是否尚未开始/正在运行/已完成/失败等,它正在哪个工作节点上运行等。这需要在主节点上存储内存。
符号O(M * R)表示随着增加 map/reduce 任务对数量, 内存需求也会增长 - 如果你在其他所有条件不变的情况下将 M 或 R 加倍, 那么用于存储状态信息的内存需求大致也会加倍。
然而, 需要注意的是尽管 O(M * R) 的关系意味着随着大量数目增加, 内存使用可能会变得非常大, 实际上每一块独立状态信息并不需要太多内存 - 根据你文本描述约为每一对 map/reduce 对占用一字节左右。
因此, 尽管由于调度开销以及保存状态信息所需内存使用量方面存在实际限制使得 M 和 R 的大小受限制, 在这些约束范围内他们仍然可以被设置得相当大以提高负载平衡与容错性.
3.6 Backup Tasks
MapReduce操作总时间延长的常见原因之一是”滞后者”:某台机器在计算过程中完成最后几个map或reduce任务所需的时间异常长。滞后者可能出现的原因有很多。
- 例如,一台磁盘有问题的机器可能会经常出现可纠正错误,使其读取性能从30 MB/s降低到1 MB/s。
- 集群调度系统可能在该机器上调度了其他任务,由于CPU、内存、本地磁盘或网络带宽竞争,导致其执行MapReduce代码速度更慢。
我们最近遇到的一个问题是机器初始化代码中存在一个错误,导致处理器缓存被禁用:受影响的机器上的计算速度减慢了一百多倍。
我们有一个通用机制来缓解滞后者问题。当MapReduce操作接近完成时,主节点会为剩余进行中任务安排备份执行。只要主执行或备份执行完成,任务就标记为已完成。我们已经调整了这种机制,因此它通常会将操作使用的计算资源比不使用Backup Tasks机制只增加不到几个百分点。我们发现这大大减少了完成大型MapReduce操作所需的时间。
举例来说,在第5.3节描述的排序程序中,如果禁用Backup Tasks机制,则排序程序需要额外的44% 的时间才能完成。
4. Refinements(改进)
尽管通过简单地编写Map和Reduce函数提供的基本功能已经足够满足大多数需求,但我们发现有一些扩展非常有用。这些扩展在本节中将会进行描述。
4.1 Partitioning Function
MapReduce的用户指定他们期望的reduce任务/输出文件数量(R)。数据通过在中间键上使用分区函数跨这些任务进行分区。提供了一个默认的分区函数,该函数使用哈希(例如 “hash(key) mod R”)。这通常会导致分区相当均衡。然而,在某些情况下,按照键的其他功能对数据进行分区可能是有用的。例如,有时输出键是URL,我们希望同一主机的所有条目都在同一个输出文件中结束。为了支持这样的情况,MapReduce库的用户可以提供一个特殊的分区函数。例如,使用 “hash(Hostname(urlkey)) mod R” 作为分区函数会使来自同一主机的所有URL都在同一个输出文件中结束。
Hostname()在这里被假设为一个函数。这个函数接收一个URL作为输入,并返回这个URL对应的主机名。所以,”hash(Hostname(urlkey)) mod R” 这个表达式的含义是先提取urlkey(也就是某个URL)的主机名,然后对主机名进行哈希运算,最后用哈希结果对R取模得到分区编号。这样可以确保来自同一主机的所有URL都会被分配到同一个输出文件中。
4.2 Ordering Guarantees
我们保证在给定分区内,中间键/值对按照中间键的递增顺序处理。这种排序保证可以很容易地为每个分区生成排序的输出文件,当输出文件格式需要支持有效的按键随机访问查找时,或者输出的用户发现对数据进行排序很方便时,这是很有用的。
以上是关于 MapReduce 分区和排序功能详细解释, 这两个特性对于控制数据如何被处理和组织以及最终结果如何呈现非常重要.
4.3 Combiner Function
在某些情况下,每个map任务产生的中间键中有大量重复,且用户指定的Reduce函数是可交换和可结合的。一个很好的例子就是2.1节中的单词计数示例。由于单词频率往往遵循齐普夫分布,每个map任务都会产生数百或数千条形如<the, 1>的记录。所有这些计数将通过网络发送到一个reduce任务,然后由Reduce函数加在一起得到一个数字。
我们允许用户指定一个可选的Combiner函数,在数据通过网络发送之前进行部分数据合并。Combiner函数在执行map任务的每台机器上执行。通常使用相同的代码来实现Combiner和reduce函数。
reduce函数和Combiner函数之间唯一的区别在于MapReduce库如何处理这个函数的输出。reduce 函数的输出写入最终输出文件。而Combiner函数的输出被写入中间文件,该中间文件将被发送给 reduce 任务。
部分结合显著提高了某些类别MapReduce操作速度。附录A包含了使用组合器例子。
4.4 Input and Output Types
MapReduce库提供了对多种不同格式的输入数据的支持。例如,”text”模式输入将每一行视为一个键值对:键是文件中的偏移量,值是行的内容。另一种常见的支持格式是按键排序的键值对序列存储。每种输入类型实现都知道如何将自身分割成有意义的范围,以便作为单独的map任务进行处理(例如,文本模式下的范围分割确保范围拆分仅在行边界处发生)。用户可以通过提供简单reader接口的实现来添加对新输入类型的支持,尽管大多数用户只使用少量预定义的输入类型之一。
reducer不一定需要提供从文件读取的数据。例如,可以轻松定义一个从数据库或映射在内存中的数据结构中读取记录的reader。类似地,我们还支持一组输出类型以产生不同格式的数据,并且用户代码很容易添加对新输出类型的支持。
4.5 Side-effects
在某些情况下,MapReduce的用户发现生成辅助文件作为他们的map和/或reduce操作符的额外输出很方便。我们依赖于应用程序编写者使这种副作用原子化和幂等。通常,应用程序写入一个临时文件,并在文件完全生成后对此文件进行原子重命名。
我们不提供对单个任务产生的多个输出文件进行原子两阶段提交的支持。因此,对于需要产生具有跨文件一致性要求的多个输出文件的任务而言,这种任务的输出应该是确定性的。这种限制在实践中从来没有成为问题。
当我们说 “我们不提供对单个任务产生的多个输出文件进行原子两阶段提交的支持”,这意味着 MapReduce 库并没有内置机制来保证一个任务生成的多个输出文件在一致性和完整性上同时提交。换句话说,如果一个任务生成了多个输出文件,并且这些文件之间有一致性要求(例如,一个文件的数据依赖于另一个文件的数据),MapReduce库并不能保证所有这些输出都会以原子方式(全部成功或全部失败)被提交。
因此,当你有这样的需求时,你需要设计你的 Map 或 Reduce 任务以确保它们是确定性的。也就是说,无论何时运行任务或在何种条件下运行任务,它们都应该产生相同、预期和正确的结果。
实际上,在大部分情况下,由于MapReduce处理流程中Map和Reduce函数设计得足够简单,并且往往只依赖于输入而非其他外部状态或结果(如其他文件),所以该限制通常不会成为实际问题。换言之,在实践中使用MapReduce时,人们通常不需要担心跨多个输出文件进行原子二阶段提交。
4.6 Skipping Bad Records
有时候,用户代码中存在错误,导致Map或Reduce函数在某些记录上确定性地崩溃。这样的错误阻止了MapReduce操作完成。通常采取的行动是修复bug,但有时候这并不可行;也许bug存在于源代码无法获取的第三方库中。另外,有时忽略一些记录是可以接受的,例如,在大数据集上进行统计分析时。
我们提供了一个可选执行模式,在其中MapReduce库检测哪些记录导致确定性崩溃,并跳过这些记录以继续前进。
每个worker进程安装一个signal handler来捕获段违规和总线错误。在调用用户Map或Reduce操作之前, MapReduce库将参数序列号存储到全局变量中。如果用户代码产生信号, 则signal handler向 MapReduce 主机发送包含序列号 “last gasp” UDP 包.
当主节点看到特定记录上出现了多次失败后, 它会指示下一次重新执行相应 Map 或 Reduce 任务时应跳过该记录。
4.7 Local Execution
在Map或Reduce函数中调试问题可能会很棘手,因为实际的计算是在一个分布式系统中进行的,通常涉及几千台机器,并且由主节点动态做出工作分配决策。为了帮助调试、性能分析和小规模测试,我们开发了一个替代的MapReduce库实现,它可以在本地机器上顺序执行所有MapReduce操作的工作。我们提供了控制给用户,以便将计算限制到特定的map任务。用户使用一个特殊标志来调用他们的程序,然后可以方便地使用任何他们认为有用的调试或测试工具(例如 gdb)。
4.8 Status Information
主节点运行一个内部HTTP服务器并导出一组供人类消费(查看)的状态页面。状态页面显示了计算进度,例如已完成多少任务、正在进行多少任务、输入的字节、中间数据的字节、输出的字节、处理速率等等。页面还包含指向每个任务生成的标准错误和标准输出文件的链接。用户可以使用这些数据预测计算需要多长时间以及是否应该添加更多资源到计算中。当计算比预期慢得多时,这些页面也可用于分析具体原因。
此外, 顶级状态页显示哪些工作节点失败, 以及它们在失败时正在处理哪些 map 和 reduce 任务。这个信息对于尝试诊断用户代码中 bug 很有用.
4.9 counters
MapReduce库提供了一个计数器设施来计数各种事件的发生次数。例如,用户代码可能想要计算处理的单词总数或索引的德语文档数量等。
要使用此设施,用户代码创建一个命名的计数器对象,然后在Map和/或Reduce函数中适当地增加计数器。例如:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
各个worker机器上的计数器值定期传播到主节点(附加在ping响应上)。主节点汇总成功的map和reduce任务的计数器值,并在MapReduce操作完成时返回给用户代码。当前的计数器值也显示在主节点状态页面(4.8中的status information)上,以便人类可以观察实时计算的进度。
在汇总计数器值时,主节点消除了同一map或reduce任务重复执行的影响以避免重复记算。(重复执行可能源于我们使用备份任务和由于故障重新执行任务)。
一些由 MapReduce 库自动维护的计数值,比如处理过程中输入键/值对和产生输出键/值对数量。
用户发现这个计数器功能对于检查 MapReduce 操作的行为非常有用。例如, 在某些 MapReduce 操作中, 用户代码可能希望确保产生输出对数量正好等于处理输入对数量, 或者处理德语文档比例在所有处理文档总量的可容忍比例范围内.
5. Performance
测试部分一律不看。
6. Experience
我们在2003年2月编写了MapReduce库的第一个版本,并在2003年8月对其进行了重大增强,包括本地性优化、跨工作机器的任务执行的动态负载平衡等。从那时起,我们对MapReduce库适用于我们处理的问题类型的广泛性感到惊喜。它已经被Google内部的各个领域广泛使用,包括:
- 大规模机器学习问题,
- 用于Google News和Froogle产品的聚类问题,
- 提取用于生成热门查询报告(例如Google Zeitgeist)的数据,
- 提取网页属性以供新实验和产品使用(例如,从大量网页语料中提取地理位置以进行本地化搜索),以及
- 大规模图计算。

图4显示了随着时间推移,在我们主要源代码管理系统中检入的单独MapReduce程序数量显著增长,从2003年初的0个到2004年9月底几乎900个单独实例。MapReduce之所以如此成功,是因为它使得编写一个简单程序并在半小时内有效地在一千台机器上运行成为可能,极大地加快了开发和原型制作周期。此外,它允许没有分布式和/或并行系统经验的程序员轻松利用大量资源。

每个作业结束时, MapReduce 库都会记录作业使用的计算资源的统计信息. 在表1中, 我们展示了一些 2004 年 8 月 Google 运行 MapReduce 工作子集统计数据.
6.1 Large-Scale Indexing
迄今为止,我们对MapReduce的最重要用途之一是完全重写了生产索引系统,该系统生成用于Google网络搜索服务的数据结构。索引系统以我们的爬行系统检索并存储为一组GFS文件的大量文档作为输入。这些文档的原始内容超过20TB的数据。索引进程作为五到十个MapReduce操作的序列运行。
使用MapReduce(而不是前版本索引系统中的ad-hoc分布式处理)带来了几个好处:
- 索引代码更简单、更小且更易于理解,因为处理容错、分发和并行化的代码隐藏在MapReduce库中。例如,当使用MapReduce表达时,计算阶段之一的大小从大约3800行C++代码降低到大约700行。
- MapReduce库性能足够好,我们可以将概念上无关联从而可以计算保持独立,而不是将它们混合在一起,这样可以避免额外的数据传递工作。这使得改变索引进程变得容易。例如,在旧版索引系统中需要几个月才能实现的改变,在新系统中只需要几天就可以实现。
- 索引进程已经变得更加容易操作,因为由机器故障、慢机器和网络问题造成的大多数问题都由MapReduce库自动处理,并且无需操作员干预。此外, 通过向集群添加新机器很容易提高索引过程性能.
7. Related Work
许多系统都提供了受限的编程模型,并利用这些限制自动并行化计算。例如,可以在N个处理器上使用并行前缀计算在log N时间内对N个元素数组的所有前缀计算关联函数[6, 9, 13]。MapReduce可以被认为是基于我们对大规模实际计算经验的一些模型的简化和提炼。更重要的是,我们提供了一个可扩展到数千个处理器的容错实现。相比之下,大多数并行处理系统只在较小规模上实现,并将处理机器故障的细节留给程序员。
批量同步编程[17]和一些MPI原语[11]提供了更高级别的抽象,使程序员更容易编写并行程序。这些系统与MapReduce之间的一个关键区别是MapReduce利用受限制的编程模型自动并行化用户程序,并提供透明容错。
我们对本地性优化得到启发来自技术如主动磁盘[12,15],其中将计算推入接近本地磁盘的处理元素(处理器)中,以减少跨I/O子系统或网络发送数据量。我们运行在直接连接在少量磁盘的商品处理器上,而不是直接运行在硬盘控制器处理器上,但通用方法类似。
我们备份任务机制类似于Charlotte System [3] 中采用过急调度机制。简单过急调度方式存在一个问题:如果某个任务导致重复失败(一直失败),则整个计算无法完成。通过跳过错误记录机制(skipping bad records)解决了此问题中某些情况。
MapReduce实现依赖于一个内部的集群管理系统,该系统负责在大量共享机器上分发和运行用户任务。虽然这不是本文的重点,但集群管理系统在精神上与Condor[16]等其他系统相似。作为MapReduce库一部分的排序设施在操作上与NOW-Sort[1]类似。源机器(map工作器)将要排序的数据划分并发送给R个reduce工作器中的一个。每个reduce工作器在本地对其数据进行排序(如果可能,在内存中)。当然,NOW-Sort没有我们库中广泛适用的用户定义的Map和Reduce函数。
River[2]提供了一种编程模型,其中进程通过在分布式队列上发送数据进行通信。像MapReduce一样,River系统试图即使在异构硬件或系统扰动引入的非均匀性存在情况下也提供良好平均性能表现。River通过仔细调度磁盘和网络传输以实现平衡完成时间来实现这一目标。而MapReduce有不同方法:通过限制编程模型, MapReduce框架能将问题划分为大量细粒度任务. 这些任务被动态地安排到可用工作者上, 以便更快的工作者处理更多任务。受限制编程模型还允许我们在接近执行结束时安排冗余执行任务冗余, 这大大减少了非均匀性存在时的完成时间 (例如慢或卡住工作者)。
BAD-FS [5] 与 MapReduce 有着非常不同的编程模型,并且与 MapReduce 不同,它针对跨广域网络执行作业。然而,有两个基本相似之处。(1)两个系统都使用冗余执行来恢复由故障引起的数据丢失。(2)两者都使用位置感知调度(locality-aware scheduling)来减少通过拥塞网络链路发送数据量。
TACC [7] 是一个旨在简化构建高可用网络服务设计的系统。像 MapReduce 一样, 它依赖重新执行机制实现容错功能。
8. Conclusions
MapReduce编程模型已经在Google的许多不同目的中成功应用。我们将这种成功归因于几个原因。首先,该模型易于使用,即使是没有并行和分布式系统经验的程序员也能使用,因为它隐藏了并行化、容错、本地优化和负载平衡的细节。其次,大量各种问题都可以轻易地表达为MapReduce计算。例如,MapReduce被用于生成Google生产网络搜索服务的数据、排序、数据挖掘、机器学习以及许多其他系统。第三,我们开发了一个MapReduce实现,该实现可以扩展到由数千台机器组成的大型集群。这个实现有效利用了这些机器资源,因此适合在Google遇到的许多大规模计算问题上使用。
从这项工作中我们学到了几件事情:
- 首先, 限制编程模型使得并行化和分布计算、以及使这样的计算具有容错性,限制编程模型可以使这两件事情变得简单。
- 其次, 网络带宽是稀缺资源。因此, 我们系统中的一些优化目标是减少跨网络发送数据量: 本地性优化允许我们从本地磁盘读取数据, 并将中间数据写入本地磁盘保存网络带宽。
- 第三, 冗余执行可用于减少慢速机器影响以及处理机器故障和数据丢失。
back.