MapReduce论文
MapReduce: Simplified Data Processing on Large Clusters
Jeffrey Dean and Sanjay Ghemawat
jeff@google.com, sanjay@google.com
Google, Inc.
Abstract
MapReduce 是一种编程模型和相关的用于处理和生成大型 数据集的实现。用户指定一个map函数来处理一个 键/值对生成一组中间键/值 对,以及一个合并所有具有相同的中间key的中间值的reduce函数。许多 现实世界的任务在这个模型中是可以表达的,这在论文中得以体现。
以这种函数式风格编写的程序会自动并行化并在大型商用机器集群上执行。运行系统 负责处理 对输入数据进行分区、在一组机器上调度程序的执行、处理机器故障以及管理集群间的机器沟通。这让程序员无需具有任何 并行和分布式系统 的经验就可以轻松利用大型分布式系统的资源。
我们的 MapReduce 实现运行在一个大型 商品机器集群并且具有高度可扩展性: 典型的 MapReduce 计算会在数千台机器上处理数 TB 的数据。程序员 发现系统易于使用:每天实施了数百个 MapReduce 程序,并且在 Google 的集群上执行了超过一千个 MapReduce 作业。
1 Introduction
在过去的五年里,作者和谷歌的许多其他人已经实现了数百种特殊用途的计算,这些计算处理大量的原始数据,如抓取的文件、网络请求日志等,以计算各种衍生数据,如倒置指数、网络文件的图形结构的各种表示、每个主机抓取的网页数量的总结、特定一天中最频繁的查询集合等。大多数这样的计算在概念上是简单明了的。然而,输入数据通常很大,计算必须分布在数百或数千台机器上,以便在合理的时间内完成。如何并行计算、分配数据和处理故障等问题,他们打算用用大量复杂的代码来处理这些问题,掩盖了原来的简单计算。
作为对这种复杂性的反应,我们设计了一个新的抽象,使我们能够表达我们试图执行的简单计算,与此同时却把并行化、容错、数据分配和负载平衡等混乱的细节隐藏在一个库中。我们的抽象受到Lisp和许多其他函数式语言中的map和reduce原语的启发。我们意识到,我们的大多数计算涉及到对我们输入的每个逻辑 "记录 "应用一个map操作,以计算一组中间的键/值对,然后对所有共享相同键的值应用一个reduce操作,以适当地组合衍生的数据。我们使用带有用户指定的map和reduce操作的功能模型,使我们能够轻松地并行化大型计算,并将重新执行作为主要的容错机制。 这项工作的主要贡献是提供了一个简单而强大的接口,使大规模计算的自动并行化和分配成为可能,并结合这个接口的实现,在大型商品PC集群上实现了高性能。
第2节描述了基本的编程模型并给出了几个例子。第3节描述了针对我们基于集群的计算环境的MapReduce接口的实现。第4节描述了我们认为有用的编程模型的几个细化。第5节介绍了我们对各种任务实现的性能测量。第6节探讨了MapReduce在Google中的应用,包括我们将其作为重写生产索引系统的基础的经验。第7节讨论了相关的和未来的工作。
2 Programming Model
该计算接受一组输入键/值对,并产生一组输出键/值对。MapReduce库的用户将计算表达为两个函数。Map和Reduce。 由用户编写的Map接收一个输入对,并产生一组中间键/值对。MapReduce库将所有与同一中间键I相关的中间值组合在一起,并将它们传递给Reduce函数。 Reduce函数也是由用户编写的,接受一个中间键I和该键的一组值。它将这些值合并起来,形成一个可能更小的值集。一般来说,每次Reduce调用只产生0或1个输出值。中间值是通过一个迭代器提供给用户的Reduce函数。这使得我们能够处理那些大到无法在内存中容纳的值列表。
2.1 Example
考虑到计算大量文档中每个词的出现次数的问题。用户将编写类似于以下伪代码的代码:
(tips:下面程序是单词计数)
map函数发射每个单词和相关的出现次数(在这个简单的例子中只有'1')。reduce函数将所有为某一特定单词发出的计数相加。 此外,用户还要写代码,将输入和输出文件的名称以及可选的调整参数填入mapreduce规范对象。然后用户调用MapReduce函数,将规范对象传给它。用户的代码与MapReduce库(用C++实现)连接在一起。附录A包含这个例子的完整程序文本。
2.2 Types
尽管前面的伪代码是以字符串的输入和输出来写的,但从概念上讲,用户提供的map和reduce函数有相关的类型。 map (k1,v1) →list(k2,v2) reduce (k2,list(v2)) →list(v2) 也就是说,输入的键和值与输出的键和值取自不同的域。此外,中间的键和值与输出的键和值来自同一个域。 我们的C++实现将字符串传递给用户定义的函数,让用户代码在字符串和适当类型之间进行转换。
2.3 More Examples
下面是几个简单的例子,这些有趣的程序可以很容易地表达为MapReduce的计算。
Distributed Grep: map函数如果与提供的模式相匹配,就会发出一行。reduce函数是一个身份函数,只是把提供的中间数据复制到输出。 Count of URL Access Frequency:map函数处理网页请求的日志并输出<URL, 1>。还原函数将同一URL的所有值加在一起,并发出<URL,total count>对。
Reverse Web-Link Graph: map函数为在一个名为source的页面中发现的每个目标URL的链接输出<target,source>对。reduce函数将与一个给定的目标URL相关的所有源URL的列表连接起来,并发出一对:<target,list(source)>
Term-Vector per Host: A term vector将一个文件或一组文件中出现的最重要的词总结为一个<word,frequency>对的列表。map函数为每个输入的文档发出一个<hostname,term vector>对(其中hostname是从文档的URL中提取的)。reduce函数被传递给一个给定的主机的所有每个文档term vector。它把这些term vector加在一起,扔掉不经常出现的terms,然后发出一个最终的<hostname,term vector>对。
Inverted Index: map函数解析每个文档,并发出一连串的<word,document ID>对。reduce函数接受一个给定单词的所有对,对相应的文档ID进行排序,并发出一个<word,list(document ID)>对。所有输出对的集合形成一个简单的倒置索引。很容易对这个计算进行扩充,以保持对单词位置的跟踪。
Distributed Sort: map函数从每条记录中提取键,并发出一个<key,record>对。还原函数将所有的配对都释放出来,不做任何改变。这个计算依赖于第4.1节中描述的分区设施和第4.2节中描述的排序属性。
3 Implementation
MapReduce接口有许多不同的实现方式。正确的选择取决于环境。例如,一种实现可能适合小型共享内存机器,另一种适合大型NUMA多处理器,还有一种适合更大的网络机器集合。 本节描述了一个针对谷歌广泛使用的计算环境的实现:用交换式以太网[4]连接起来的大型商品PC集群。在我们的环境中。
(1)机器通常是运行Linux的双处理器X86处理器,每台机器有2-4GB的内存。 (2)使用商品网络硬件--通常在机器层面上是100兆/秒或1千兆/秒,但平均下来,整体分节带宽要小得多。 (3)一个集群由数百或数千台机器组成,因此机器故障很常见。 (4)存储是由直接连接到各个机器的廉价IDE磁盘提供的。一个内部开发的分布式文件系统[8]被用来管理存储在这些磁盘上的数据。该文件系统使用复制来在不可靠的硬件上提供可用性和可靠性。 用户向一个调度系统提交jobs。每个job由一组tasks组成,并由调度器映射到集群中的一组可用机器上。
3.1 Execution Overview(执行概述)
通过将输入数据自动划分为一组M个分片,Map调用被分布在多个机器上。输入的分片可以由不同的机器并行处理。Reduce调用是通过使用分区函数(例如,hash(key) modR)将中间的密钥空间划分为R块来进行分配。分区的数量(R)和分区函数由用户指定。 图1显示了我们实现中的MapReduce操作的整体流程。当用户程序调用MapReduce函数时,会发生以下一系列动作(图1中的数字标签对应于下面列表中的数字):
1.用户程序中的MapReduce库首先将输入文件分成M块,每块通常为16兆到64兆(MB)(可由用户通过一个可选参数控制)。然后,它在一个机器集群上启动了许多程序副本。 2.其中一个程序副本是特殊的--Master。其余的是由Master分配工作的Worker。有M个map任务和R个reduce任务需要分配。master挑选空闲的worker,给每个人分配一个map任务或一个reduce任务。 3.一个被分配到map任务的worker读取相应的输入分片的内容。它从输入数据中解析出k/v对,并将每个k/v对传递给用户定义的Map函数。由Map函数产生的中间k/v对被缓冲在内存中。 4.定期将缓冲的数据对写入本地磁盘,并通过分区函数将其划分为R区域。这些缓冲对在本地磁盘上的位置被传回给master,master负责将这些位置转发给reduce workers。 5.当一个reduce worker收到master关于这些位置的通知时,它使用远程过程调用,从map worker的本地磁盘上读取缓冲数据。当一个reduce worker读取了所有的中间数据后,它将按照中间键进行排序,以便将所有相同键的出现归为一组。之所以需要排序,是因为通常许多不同的键会映射到同一个reduce任务。如果中间数据量太大,无法在内存中容纳,就会使用一个外部排序。
6.reduce worker在排序的中间数据上进行迭代,对于遇到的每个唯一的中间键,它将键和相应的中间值集传递给用户的reduce函数。Reduce函数的输出被附加到这个Reduce分区的最终输出文件中。 7.当所有的map任务和Reduce任务都完成后,master会唤醒用户程序。这时,用户程序中的MapReduce调用会返回到用户代码中。
成功执行后,mapreduce执行的输出可在R输出文件中获得(每个reduce任务输出一个,文件名由用户指定)。通常情况下,用户不需要将这些R输出文件合并成一个文件--他们经常将这些文件作为输入传递给另一个MapReduce调用,或者从另一个能够处理被分割成多个文件的输入的分布式应用程序中使用这些文件。
3.2 Master Data Structures
master保留了几个数据结构。对于每个map任务和reduce任务,它都存储了他们的状态(空闲、进行中或已完成),以及worker机器的身份(对于非空闲任务)。 master是一个通道,中间文件区域的位置通过它从map任务传播到reduce任务。因此,对于每个已完成的map任务,master存储了由map任务产生的R个中间文件区域的位置和大小。随着map任务的完成,会收到对这个位置和大小信息的更新。这些信息被逐渐地推送给正在进行着reduce任务的worker。
3.3 Fault Tolerance (容错性)
由于MapReduce库被设计为使用成百上千台机器帮助处理非常大量的数据,所以该库必须优雅地容忍机器故障。
Worker Failure
master定期对每个worker进行ping。如果在一定时间内没有收到worker的响应,主站将该worker标记为失败。该worker完成的任何map任务都会被重置为初始的空闲状态,因此有资格在其他worker上进行调度。同样,失败的worker上正在进行的任何map任务或reduce任务也被重置为空闲状态,并有资格进行重新调度。 已完成的map任务在故障时被重新执行,因为它们的输出被存储在故障机器的本地磁盘上,因此无法访问。已完成的Reduce任务不需要重新执行,因为它们的输出被存储在全局文件系统中。 当一个map任务先由worker A执行,然后再由worker B执行时(因为A失败了),所有执行reduce任务的worker都会收到重新执行的通知。任何尚未从worker A读取数据的reduce任务都会从worker B读取数据。 MapReduce对大规模的worker故障有很强的弹性。例如,在一次MapReduce操作中,正在运行的集群上的网络维护导致一次80台机器的集群在几分钟内无法连接。MapReduce master简单地重新执行了无法到达的worker机器所做的工作,并继续向前推进,最终完成了MapReduce操作。
Master Failure
让master写出上述主数据结构的定期检查点是很容易的。如果master死亡,可以从最后一个检查点的状态开始一个新的副本。然而,鉴于只有一个master,它的失败(奔溃)是不太可能的;因此,我们目前的实现是在master失败时中止了MapReduce计算。客户端可以检查这种情况,如果他们愿意,可以重试MapReduce操作。
Semantics in the Presence of Failures
(失败情况下的语义分析)
当用户提供的map和reduce操作符是其输入值的确定性函数时,我们的分布式实现会产生与整个程序的非故障顺序执行所产生的相同的输出。 我们依靠map和reduce任务输出的原子提交来实现这一特性。每个进行中的任务都将其输出写入私有的临时文件中。一个reduce任务产生一个这样的文件,一个map任务产生R个这样的文件(每个reduce任务一个)。当一个map任务完成时,worker会向master发送一个消息,并在消息中包括R个临时文件的名称。如果master收到一个已经完成的map任务的完成消息,它将忽略该消息。否则,它会在主数据结构中记录R文件的名称。 当一个reduce任务完成时,reduce worker会将其临时输出文件原子化地重命名为最终输出文件。如果同一个reduce任务在多台机器上执行,那么对于同一个最终输出文件,将执行多个重命名调用。我们依靠底层文件系统提供的原子重命名操作来保证最终的文件系统状态只包含reduce任务的一次执行所产生的数据。 我们绝大部分的map和reduce操作都是确定的,在这种情况下,我们的语义等同于顺序执行,这使得程序员很容易推理他们的程序行为。当map and/or reduce操作符是不确定的,我们提供了较弱但仍然合理的语义。在非确定性操作存在的情况下,一个特定的reduce任务R1的输出等同于非确定性程序的顺序执行所产生的R1的输出。然而,不同的reduce任务R2的输出可能对应于非确定性程序的不同顺序执行所产生的R2的输出。 考虑map任务M和reduce任务R1和R2。让e(Ri)成为Ri的执行结果(正好有一个这样的执行)。由于e(R1)可能已经读取了M的一个执行所产生的输出,而e(R2)可能已经读取了M的另一个执行所产生的输出,因此出现了较弱的语义。
3.4 Locality(定位)
在我们的计算环境中,网络带宽是一种相对稀缺的资源。我们通过利用输入数据(由GFS[8]管理)存储在构成我们集群机器的本地磁盘上这一事实来节约网络带宽。GFS将每个文件分成64MB的块,并将每个块的几个副本(通常是3个副本)存储在不同的机器上。MapReduce主程序考虑到了输入文件的位置信息,并试图在包含相应输入数据副本的机器上安排一个map任务。如果做不到这一点,它就会尝试在该任务的输入数据副本附近安排一个map任务(例如,在与包含数据的机器在同一网络交换机上的worker机器上)。当在集群中相当一部分worker上运行大型MapReduce操作时,大多数输入数据都在本地读取,不消耗网络带宽。
3.5 Task Granularity(任务颗粒度)
如上所述,我们将map阶段细分为M块,将reduce阶段细分为R块。理想情况下,M和R应该比worker机器的数量大得多。让每个worker执行许多不同的任务可以改善动态负载均衡,也可以在一个worker失败时加速恢复:它所完成的许多map任务可以分散到所有其他worker机器上。 在我们的实现中,对M和R的大小是有实际限制的,因为主站必须做出O(M+R)的调度决定,并在内存中保持O(M∗R)的状态,如上所述。(然而,内存使用的恒定因素很小:状态的O(M∗R)部分由每个map任务/reduce任务对的大约一个字节的数据组成)。 此外,由于每个reduce任务的输出最终都在一个单独的输出文件中,所以R经常受到用户的限制。在实践中,我们倾向于选择M,使每个单独的任务大约是16MB到64MB的输入数据(以便上述的定位优化最有效),并且我们使R成为我们预期使用的worker机器数量的几倍(比较small的倍数)。我们经常在M=200,000和R=5,000的情况下进行MapReduce计算,使用2,000台工作机。
3.6 Backup Tasks(备份任务)
延长MapReduce操作总时间的常见原因之一是 "straggler(滞留者)":一台机器花了异常长的时间来完成计算中的最后几个map或reduce任务之一。straggler(滞留者)产生的原因有很多。例如,一台磁盘坏了的机器可能会经常出现可纠正的错误,使其读取性能从30MB/s降到1MB/s。集群调度系统可能在机器上安排了其他任务,由于对CPU、内存、本地磁盘或网络带宽的竞争,导致它执行MapReduce代码的速度变慢。我们最近遇到的一个问题是机器初始化代码中的一个错误,导致处理器缓存被禁用:受影响的机器上的计算速度降低了一百多倍。 我们有一个通用的机制来缓解straggler(滞留者)的问题。当一个MapReduce操作接近完成时,master会安排其余正在进行的任务的备份执行。只要主任务或备份执行完成,该任务就被标记为完成。我们对这一机制进行了调整,使其通常会使操作所使用的计算资源增加不超过百分之几。我们发现,这大大减少了完成大型MapReduce操作的时间。举个例子,第5.3节中描述的排序程序,当备份任务机制被禁用时,完成的时间要长44%。
**4 **Refinements(完善)
尽管简单地编写Map和Reduce函数所提供的基本功能足以满足大多数需求,但我们发现有一些扩展很有用。本节将介绍这些扩展。
4.1 Partitioning Function(分区功能)
MapReduce的用户指定他们想要的reduce任务/输出文件的数量(R)。数据在这些任务中使用中间键的分区函数进行分区。提供了一个默认的分区函数,使用散列法(例如 "hash(key)mod R")。这往往会导致相当均衡的分区。然而,在某些情况下,通过密钥的一些其他函数来划分数据是很有用的。例如,有时输出键是URL,而我们希望一个主机的所有条目最终都在同一个输出文件中。为了支持这样的情况,MapReduce库的用户可以提供一个特殊的分区函数。例如,使用 "hash(Hostname(urlkey))mod R "作为分区函数会使同一主机的所有URL最终出现在同一输出文件中。
4.2 Ordering Guarantees
我们保证在一个给定的分区内,中间的键/值对是按 键 的递增顺序处理的。这种排序保证使得每个分区很容易产生一个排序的输出文件,这在输出文件格式需要支持有效的随机访问键的查找,或者输出的用户发现数据排序很方便的时候是很有用的。
4.3 Combiner Function
在某些情况下,每个map任务产生的中间键有很大的重复性,而用户指定的Reduce函数是可交换的和关联的。这方面的一个很好的例子是第2.1节中的单词计数例子。由于单词频率倾向于遵循Zipf分布,每个map任务都会产生成百上千个或数以千计的<the, 1>形式的记录。所有这些计数都将通过网络发送到一个Reduce任务,然后由Reduce函数相加,产生一个数字。我们允许用户指定一个可选的Combiner函数,在通过网络发送之前对这些数据进行部分合并。 Combiner函数在每个执行map任务的机器上执行。通常情况下,同样的代码被用来实现combiner(组合器)和reduce函数。reduce函数和组合函数的唯一区别是MapReduce库如何处理函数的输出。reduce函数的输出被写到最终的输出文件中。组合器函数的输出被写入一个中间文件,该文件将被发送到一个reduce任务。 部分组合大大加快了某些类别的MapReduce操作。附录A包含一个使用组合器的例子。
4.4 Input and Output Types
MapReduce库提供了对读取几种不同格式的输入数据的支持。例如,"文本 "模式的输入将每一行视为一个键/值对:键是文件中的偏移(理解为行数?),值是该行的内容。另一种常见的支持格式是存储一个按键排序的键/值对的序列。每个输入类型的实现都知道如何将自己分割成有意义的范围,作为单独的map任务进行处理(例如,文本模式的范围分割确保范围分割只发生在行的边界)。用户可以通过提供一个简单的阅读器接口的实现来增加对新输入类型的支持,尽管大多数用户只是使用少量预定义输入类型中的一种。 读取器不一定需要提供从文件中读取的数据。例如,很容易定义一个从数据库或从内存中映射的数据结构中读取记录的阅读器。 以类似的方式,我们支持一组输出类型,用于产生不同格式的数据,而且用户代码很容易增加对新输出类型的支持。
4.5 Side-effects
在某些情况下,MapReduce的用户发现产生辅助文件作为他们的map 和/或 reduce操作者的额外输出是很方便的。我们依靠应用程序的编写者来使这种副作用成为原子性的和空闲的。通常情况下,应用程序写入一个临时文件,一旦该文件被完全生成,则原子地重命名该文件。 我们不提供对一个任务产生的多个输出文件的原子两阶段提交的支持。因此,产生多个具有跨文件一致性要求的输出文件的任务应该是确定的。这一限制在实践中从未出现过问题。
4.6 Skipping Bad Records
有时,用户代码中会出现一些bug,导致Map或Reduce函数在某些记录上确定性地崩溃。这样的bug会阻碍MapReduce操作的完成。通常的做法是修复错误,但有时这并不可行;也许错误是在第三方库中,而源代码是不可用的。另外,有时忽略几条记录也是可以接受的,比如在大数据集上做统计分析时。我们提供了一种可选的执行模式,MapReduce库会检测哪些记录会导致确定性崩溃,并跳过这些记录,以便向前推进。 每个工作进程都安装了一个信号处理程序,可以捕捉到分段违反和总线错误。在调用用户的Map或Reduce操作之前,MapReduce库将参数的序列号存储在一个全局变量中。如果用户代码产生了一个信号,信号处理程序会向MapReduce主站发送一个包含序列号的 "last gasp "UDP数据包。当master在某一特定记录上看到一个以上的故障时,它表示在发出相应的Map或Reduce任务的下一次重新执行时应跳过该记录。
4.7 Local Execution
调试Map或Reduce函数中的问题是很棘手的,因为实际计算是在分布式系统中进行的,往往是在几千台机器上进行的,工作分配的决定是由master动态做出的。为了方便调试、分析和小规模测试,我们开发了一个MapReduce库的替代实现,它在本地机器上按顺序执行一个MapReduce操作的所有工作。我们为用户提供了控制措施,以便将计算限制在特定的map任务上。用户用一个特殊的标志来调用他们的程序,然后可以很容易地使用他们认为有用的任何调试或测试工具(如gdb)。
4.8 Status Information
master运行一个内部的HTTP服务器,并输出一组状态页供人使用。这些状态页面显示了计算的进度,例如有多少任务已经完成,有多少任务正在进行,输入的字节数,中间数据的字节数,输出的字节数,处理率等等。这些页面还包含每个任务产生的标准错误和标准输出文件的链接。用户可以使用这些数据来预测计算将需要多长时间,以及是否应该在计算中添加更多的资源。这些页面也可以用来计算出计算比预期慢得多的时候。 此外,顶层的状态页面显示了哪些worker失败了,以及他们失败时正在处理哪些map和reduce任务。这些信息在尝试诊断用户代码中的错误时非常有用。
4.9 Counters(计数器)
MapReduce库提供了一个计数器,用来计算各种事件的发生次数。例如,用户代码可能想计算处理的总字数或索引的德语文档的数量等。 要使用这个工具,用户代码要创建一个命名的计数器对象,然后在Map和/或Reduce函数中适当地增加该计数器。例如:
来自各个worker机器的计数器值会定期传播到master(捎带着Ping响应)。master汇总来自成功的map和reduce任务的计数器值,并在MapReduce操作完成后将其返回给用户代码。当前的计数器值也会显示在master的状态页面上,这样人们就可以观察到实时计算的进展。在汇总计数器值时,主站会消除同一map或reduce任务的重复执行的影响,以避免重复计算。(重复执行可能来自于我们对备份任务的使用,以及由于故障而重新执行的任务)。 有些计数器的值是由MapReduce库自动维护的,比如处理的输入键/值对的数量和产生的输出键/值对的数量。 用户发现计数器对检查MapReduce操作的合理性非常有用。例如,在某些MapReduce操作中,用户代码可能希望确保产生的输出对的数量与处理的输入对的数量完全相等,或者确保处理的德语文件的比例在处理的文件总数的某个可容忍的分数之内。
5 Performance
在本节中,我们测量了MapReduce在一个大型机器集群上运行的两个计算的性能。其中一个计算在大约一兆字节的数据中搜索,寻找一个特定的模式。另一个计算则对大约一兆字节的数据进行排序。 这两个程序是MapReduce用户编写的真实程序的一个很大的子集--一类程序将数据从一个表示法洗(shuffles )到另一个表示法,另一类程序从大数据集中提取少量有趣的数据。
5.1 Cluster Configuration
所有的程序都在一个由大约1800台机器组成的集群上执行。每台机器都有两个2GHz的英特尔至强处理器,启用了超线程技术,4GB内存,两个160GB的IDE磁盘,以及一个千兆以太网链接。这些机器被安排在一个两层的树形交换网络中,根部有大约100-200Gbps的总带宽。所有的机器都在同一个主机设施中,因此任何一对机器之间的往返时间都小于一毫秒。
在4GB的内存中,大约1-1.5GB被集群上运行的其他任务所保留。这些程序是在一个周末的下午执行的,当时CPU、磁盘和网络大多是闲置的。
5.2 Grep
grep程序扫描了1010条100字节的记录,寻找一个相对罕见的三字符模式(该模式出现在92,337条记录中)。输入被分成大约64MB(M=15000),而整个输出被放在一个文件中(R=1)。 图2显示了随时间变化的计算进度。Y轴显示输入数据被扫描的速度。随着越来越多的机器被分配到这个MapReduce计算中,速度逐渐加快,当分配到1764个工人时,速度达到峰值,超过30GB/s。随着map任务的完成,速率开始下降,并在计算的80秒左右达到零。整个计算从开始到结束大约需要150秒。这包括大约1分钟的启动开销。这笔开销是由于程序传播到所有的worker机器上,以及与GFS互动以打开1000个输入文件集和获得位置(locality )优化所需的信息的延迟。
Figure 2: Data transfer rate over time
5.3 Sort
该排序程序对1010条100字节的记录进行排序(大约1兆字节的数据)。这个程序是以TeraSort基准[10]为模型的。
(关于TeraSort也挺有意思的,可以看看https://its201.com/article/yongjian_luo/9955379)
该排序程序由不到50行的用户代码组成。一个三行的Map函数从一个文本行中提取一个10字节的排序键,并将该键和原始文本行作为中间键/值对发射出去。我们使用一个内置的Identity函数作为Reduce操作符。这个函数将中间的键/值对作为输出的键/值对,没有改变。最终的排序输出被写入一组2路复制的GFS文件(即2兆字节被写入程序的输出)。
和以前一样,输入数据被分割成64MB的碎片(M=15000)。我们将排序后的输出划分为4000个文件(R=4000)。分区函数使用钥匙的初始字节将其隔离到R块中的一个。我们对这个基准的分区函数有关于键的分布的内置知识。在一般的排序程序中,我们会添加一个预处理的MapReduce操作,收集钥匙的样本,并使用采样的钥匙的分布来计算最终排序的分割点。
图3(a)显示了排序程序的正常执行进度。左上图显示了读取输入的速度。速率在13GB/s时达到峰值,并很快消失,因为所有的map任务都在200秒之前完成。请注意,输入率比grep要低。这是因为排序map任务花了大约一半的时间和I/O带宽将中间输出写到本地磁盘上。grep的相应中间输出的大小可以忽略不计。
左边中间的图显示了数据通过网络从map任务发送到reduce任务的速度。这种洗牌(shuffle)在第一个map任务完成后就开始了。图中的第一个驼峰是第一批大约1700个reduce任务(整个MapReduce被分配了大约1700台机器,每台机器每次最多执行一个reduce任务)。在计算进行了大约300秒后,这些第一批reduce任务中的一些完成了,我们开始为剩余的reduce任务洗数据。所有的洗牌工作在计算的600秒左右完成。左下图显示reduce任务将排序后的数据写入最终输出文件的速度。在第一个洗牌期结束和写入期开始之间有一个延迟,因为机器正忙于对中间数据进行排序。写入工作以大约2-4GB/s的速度持续了一段时间。所有的写入工作在计算过程中大约850秒完成。包括启动开销,整个计算需要891秒。这与目前TeraSort基准的最佳报告结果1057秒相似[18]。 有几件事需要注意:输入率高于洗牌率和输出率,因为我们进行了定位优化--大多数数据是从本地磁盘读取的,绕过了我们相对带宽有限的网络。洗牌率高于输出率是因为输出阶段写了两份排序后的数据(出于可靠性和可用性的考虑,我们对输出做了两个副本)。我们写两个副本是因为这是我们的底层文件系统所提供的可靠性和可用性机制。 如果底层文件系统使用erasure 编码[14]而不是replication,那么写入数据的网络带宽需求将减少。
5.4 Effect of Backup Tasks(备份任务的影响)
在图3(b)中,我们展示了在禁用备份任务的情况下执行排序程序的情况。执行流程与图3(a)所示相似,除了有一个非常长的尾巴,几乎没有任何写入活动发生。960秒后,除了5个reduce任务外,其他的都完成了。然而,这最后几个落伍者(stragglers )直到300秒后才完成。整个计算过程需要1283秒,耗时增加了44%。
5.5 Machine Failures
在图3(c)中,我们展示了排序程序的执行情况,在计算的几分钟内,我们故意杀死了1746个工人进程中的200个。底层集群调度器立即在这些机器上重新启动了新的工作进程(因为只有进程被杀死,机器仍在正常运行)。
worker的死亡显示为负的输入率,因为一些先前完成的map工作消失了(因为相应的map worker被杀),需要重新做。这个map工作的重新执行发生得比较快。整个计算在933秒内完成,包括启动开销(只是比正常执行时间增加了5%)。
6 Experience
我们在2003年2月编写了MapReduce库的第一个版本。 2003年2月,我们编写了第一个版本的MapReduce库,并在2003年8月对其进行了重大改进,包括位置(locality)优化、worker上任务执行的动态负载均衡等。从那时起,我们就对MapReduce库在我们所处理的各种问题中的广泛适用性感到惊喜。它已经被用于谷歌内部的各种领域,包括:
大规模的机器学习问题。
谷歌新闻和Froogle产品的聚类问题。Froogle产品的分类问题
提取数据用于制作流行查询的报告(如Google Zeitgeist)。
为新的实验和产品提取网页的属性(例如,从大量的网页语料库中提取地理位置用于本地化搜索),以及
大规模的图计算。
图4显示了随着时间的推移,检查到我们的主要源代码管理系统中的独立MapReduce程序数量的显著增长,从2003年初的0到2004年9月底的近900个独立实例。MapReduce之所以如此成功,是因为它可以在半小时内编写一个简单的程序并在一千台机器上高效运行,大大加快了开发和原型设计周期。此外,它允许没有分布式和/或并行系统经验的程序员轻松利用大量的资源。
在每个作业结束时,MapReduce库会记录该作业所使用的计算资源的统计数据。在表1中,我们展示了2004年8月在Google运行的MapReduce作业子集的一些统计数据。
6.1 Large-Scale Indexing
迄今为止,我们对MapReduce最重要的应用之一是完全重写了生产索引系统,该系统产生了用于谷歌网络搜索服务的数据结构。该索引系统的输入是由我们的爬行系统检索的大量文件,这些文件以一组GFS文件的形式存储。这些文件的原始内容是超过20兆字节的数据。索引过程以五到十次MapReduce操作的序列运行。使用MapReduce(而不是先前版本的索引系统中的临时分布式传递)提供了几个好处:
索引代码更简单、更小、更容易理解,因为处理容错、分布和并行化的代码都隐藏在MapReduce库中。例如,使用MapReduce表达时,一个阶段的计算规模从大约3800行C++代码下降到大约700行。
MapReduce库的性能足够好,我们可以把概念上不相关的计算分开,而不是把它们混在一起,以避免对数据的额外传递。这使得我们很容易改变索引的过程。例如,在我们旧的索引系统中花了几个月时间做出的一个改变,在新的系统中只花了几天时间就实现了。
索引过程变得更容易操作,因为大部分由机器故障、慢速机器和网络故障引起的问题都由MapReduce库自动处理,不需要操作员干预。此外,通过在索引集群中增加新的机器,很容易提高索引过程的性能。
**7 **Related Work
许多系统提供了限制性的编程模型,并利用这些限制来自动并行化计算。例如,一个关联函数可以在N个处理器上使用并行前缀计算在对数N个元素数组的所有前缀上进行计算[6, 9, 13]。MapReduce可以说是基于我们在大型现实世界计算中的经验,对其中一些模型的简化和提炼。更重要的是,我们提供了一个可扩展到数千个处理器的容错实现。相比之下,大多数并行处理系统只在较小的规模上实现,并将处理机器故障的细节留给了程序员。
批量同步编程[17]和一些MPI原语[11]提供了更高层次的抽象,使程序员更容易编写并行程序。这些系统与MapReduce的一个关键区别是,MapReduce利用限制性的编程模型来自动并行化用户程序,并提供透明的容错。
我们的定位(locality )优化从主动磁盘[12,15]等技术中获得灵感,在这些技术中,计算被推到靠近本地磁盘的处理元件中,以减少跨I/O子系统或网络的数据量。我们在直接连接了少量磁盘的商品处理器上运行,而不是直接在磁盘控制器处理器上运行,但一般的方法是相似的。
我们的备份任务机制类似于夏洛特系统3中采用的急切调度机制。简单的急切调度的缺点之一是,如果一个给定的任务导致重复失败,整个计算就无法完成。我们通过跳过坏记录的机制来解决这个问题的一些实例。
MapReduce的实现依赖于一个内部的集群管理系统,该系统负责在大量的共享机器上分配和运行用户任务。虽然不是本文的重点,但该集群管理系统在精神上与其他系统如Condor[16]相似。
作为MapReduce库的一部分,其排序设施在操作上与NOW-Sort[1]相似。源机器(map worker)对要排序的数据进行分区(partition ),并将其发送给R reduce worker之一。每个reduce worker在本地对其数据进行排序(如果可能的话,在内存中)。当然,NOW-Sort没有用户可定义的Map和Reduce功能,而这些功能使我们的库广泛适用。 River[2]提供了一个编程模型,进程之间通过分布式队列发送数据进行通信。像MapReduce一样,River系统试图提供良好的平均性能,即使在异构硬件或系统扰动引入的非均匀性情况下。River通过对磁盘和网络传输的精心调度来实现这一目标,以达到平衡的完成时间。MapReduce有一个不同的方法。通过限制编程模型,MapReduce框架能够将问题分割成大量的细粒度任务。这些任务被动态地安排在可用的工作器上,以便更快的工作器能够处理更多的任务。受限的编程模型还允许我们在工作接近尾声时安排任务的冗余执行,这在存在非均匀性(如缓慢或卡住的worker)的情况下大大减少了完成时间。 BAD-FS[5]的编程模型与MapReduce非常不同,与MapReduce不同的是,它针对的是跨广域网的作业执行。然而,有两个基本的相似之处。(1) 两种系统都使用冗余执行来恢复由故障引起的数据丢失。(2) 两者都使用局部感知调度,以减少在拥挤的网络链接中发送的数据量。 TACC[7]是一个旨在简化构建高可用性网络服务的系统。像MapReduce一样,它依靠重新执行作为实现容错的机制。
8 Conclusions
在谷歌,MapReduce编程模型已被成功用于许多不同的目的。我们把这种成功归功于几个原因。首先,该模型很容易使用,即使是没有并行和分布式系统经验的程序员,因为它隐藏了并行化、容错、位置优化和负载均衡等细节。其次,大量的问题很容易被表达为MapReduce的计算。例如,MapReduce被用于谷歌生产的网络搜索服务的数据生成,用于排序,用于数据挖掘,用于机器学习,以及其他许多系统。第三,我们开发了一个MapReduce的实现,可以扩展到由成千上万台机器组成的大型集群。该实现有效地利用了这些机器资源,因此适合用于谷歌公司遇到的许多大型计算问题。
我们从这项工作中学到了几件事。首先,限制编程模型使得并行化和分布式计算变得容易,并使这种计算具有容错性。第二,网络带宽是一种稀缺资源。因此,我们系统中的一些优化是以减少网络上发送的数据量为目标的:位置性(locality)优化使我们能够从本地磁盘上读取数据,而将中间数据的单一副本写入本地磁盘则可以节省网络带宽。第三,冗余执行可以用来减少慢速机器的影响,并处理机器故障和数据丢失。
Acknowledgements
Josh Levenberg根据他使用MapReduce的经验和其他人的改进建议,对用户级MapReduce API进行了修订和扩展,增加了许多新功能。MapReduce从Google文件系统中读取其输入,并将其输出写入Google文件系统[8]。我们要感谢Mohit Aron, Howard Gobioff, Markus Gutschke, David Kramer, Shun-Tak Leung和Josh Redstone在开发GFS方面所做的工作。我们还要感谢Percy Liang和Olcan Sercinoglu在开发MapReduce使用的集群管理系统方面的工作。Mike Burrows、Wilson Hsieh、Josh Levenberg、Sharon Perl、Rob Pike和Debby Wallach对本文的早期草稿提供了有益的意见。OSDI的匿名审稿人和我们的监护人Eric Brewer对本文可以改进的地方提供了许多有用的建议。最后,我们感谢谷歌工程部门的所有MapReduce用户提供的有益反馈、建议和错误报告。
References
有兴趣自己看原文
A WordFrequency
本节包含一个程序,用于计算在命令行上指定的一组输入文件中每个独特单词的出现次数。
Last updated