MIT 6.824/6.5840 分布式系统 Lab 1: MapReduce

本次 Lab 1 讲的是分布式计算的 MapReduce。

#1. 背景

MapReduce 源自 Google 在 OSDI '04 发表的论文,论文的作者是 Jeff Dean 和 Sanjay。主要提出了一种简单、可拓展的分布式计算编程模型。

对于用户来说,只需要实现 Map()Reduce() 两个函数,即可享受到分布式计算的好处,这就是 MapReduce 的设计理念。

#思想

MapReduce 需要用户将计算过程表达成以下两个函数:

  • Map(key, value) -> list(ikey, ivalue)
  • Reduce(ikey, list(ivalue)) -> list(ivalue)

到这里大家可能已经感受到一点并行的思想,其实 MapReduce 是把一个任务分解成一系列子任务,每个子任务由 Map 函数去执行。这些子任务的输出会被重组,然后分门别类送到 Reduce 函数去执行。

MapReduce 的 Motivation 是:

  • 很多大数据计算任务都是大量输入对应到小量的输出;
  • 分而治之,一个复杂计算任务往往可以分成多个更简单的部分;
  • 子任务之间往往可以并行执行; -> Map

怎么区分”多个更简单的部分”?使用ikey标示子任务。

#2. MapReduce 程序示例

#Word Count

Word count任务是统计一批文本中每个词出现的数量

Map任务:统计一个分块中的词频

Reduce任务:合并所有分块的词频

#Grep

Map任务:对一个分块做grep

Reduce任务:合并所有的grep结果

#计算图的逆

已知所有的<source, target>边,计算每个target被哪些source引用

Map任务:逆转一组边,得到<target, source>

Reduce任务:按target合并所有的source

#分布式排序

Map任务:原样输出

Reduce任务:原样输出

#3. 框架实现

MapReduce 框架负责调度和执行用户编写的 Map 和 Reduce 任务。

根据环境(机器配置,网络拓扑,故障概率,和存储成本)不同,可以有很多不同的实现方案

#概览

框架首先将输入数据分成MM块,然后对每一块分别执行map过程,这个过程是在多个机器上并行执行的;

map全部完成后,将中间结果按照某些策略分成RR块,然后并行执行reduce过程;

#主控节点数据结构

  • 任务状态:任务类型(Map or Reduce),执行状态(进行中 or 已完成),所在的机器
  • 数据状态:中间结果的存储位置和大小

#分布式容错设计

Worker节点的保障:Master节点会周期性地ping Worker节点;失联Worker节点已完成的任务会重新调度。

Master节点的保障:Master节点会周期性做checkpoint方便故障时重新开始。

出错时的语义保证:需要实现Map/Reduce任务提交是原子性的

  • Map任务通过提交临时文件名,由master做改名实现。
  • Reduce任务完成时会把工作临时文件改名为目标文件。

#程序局部性

分析局部性主要是考虑到带宽资源的限制。

论文中实现的 MapReduce 框架是建立在 GFS (Google File System) 上的,GFS 会对文件进行分块然后保存三份副本。

作者在实现时考虑到了底层GFS的影响,会尽量调度任务到有副本或者距离副本近的机器上。

#任务划分粒度

首先,输入数据会被划分成MM 个分块调用 Map,然后中间结果会被划分成RR 个分块调用 Reduce

理想情况下,MMRR 应该远远大于 Worker 机器数量:利于负载均衡、故障恢复、和任务均匀分发;

取值上界:受限于主控节点的Memory,空间复杂度=O(M×R)O(M\times R)

实践经验:RR 影响中间结果文件的数量

Google经验:2,000台Worker机器;M=200,000M=200,000R=5,000R=5,000

#备用任务

一般来说,MapReduce框架计算的整体速度受制于少数“掉队者”,掉队的原因有很多,例如机器故障;

当整个计算过程将要完成时,主控节点会将剩余的任务多安排一次(备用任务)。

#4. 一些细节优化

#分区函数

提供给用户的分区函数,方便开发用;

#数据处理顺序

MapReduce框架保证同一个分区内,数据是按照key有序处理的,方便用户通过key访问结果文件;

#合并器 Combiner

在一些任务中,中间结果划分不是均匀的,可能会导致大量中间结果分配到同一个Reduce任务去,打满网络带宽。

例如Word count任务,会有大量的<the, 1>记录;

为此MapReduce框架设计了一个可选的 Combiner 函数,在 Map 之后先对结果进行一次合并,再通过网络发出去;

#输入输出类型

MapReduce框架是支持多种输入文件格式,也支持用户自己实现一个reader接口。

#副作用

有时候用户在处理数据时可能想要一些辅助文件,这些是通过统一的一层Application Writer完成,这一层会处理原子更新文件(先写到临时文件然后再改名)之类的细节;

对于多文件的情况,没有提供原子的两阶段提交,这一点需要用户自己保证;

#跳过坏记录

有时候用户的MapReduce可能会由于bug崩溃,MapReduce框架提供了一种可选的执行模式,可以自动检测确定性的崩溃数据,然后跳过这些记录;

框架每次调度任务时会附带一个Sequence Number。每个Worker进程都会注册Signal Handler,捕获SegFault和BusError。当程序崩溃时,框架会给主控节点发一个"Last gasp"UDP数据包带上这个Seq Number。当框架检测到某一个记录崩溃超过一次,下次就会跳过。

#本地执行

为了方便用户Debug,框架提供了一种本地执行模式。

#状态信息

主控节点跑了一个HTTP服务,暴露内部的一些状态信息。

#计数器

框架提供了一个Counter对象,方便用户实现一些计数需求(可能是为了san check)

#5. 性能测试

#6. 实践经验

#大规模索引任务

收益巨大:

  • 简化业务逻辑:索引代码更简单、更小、更容易理解,因为处理容错、分布和并行化的代码隐藏在 MapReduce 库中。
  • 关注点分离:MapReduce 库的性能足够好,我们可以将概念上不相关的计算分开,而不是将它们混合在一起以避免对数据进行额外的传递。这使得更改索引过程变得容易。
  • 简化横向拓展:机器、网络导致的问题被框架处理了,使得横向拓展更简单,只需要加机器即可。

#回到 Lab 1

Lab 1 是实现 MapReduce 框架的两个组件 Coordinator(Master)和 Worker。

  • Master 负责分发任务给 Worker,会启动一个 RPC Server;
  • Worker 从 Master 那里获取 MapReduce 任务并执行。

整个 MapReduce 框架包括:

  • main/mrcoordinator.go 是 Master 的入口,会调用 MakeCoordinator() 然后等待;
  • main/mrworker.go 是 Worker 的入口,会加载用户程序(so库)获得 MapfReducef,然后调用 Worker;
  • mr/coordinator.go 实现 Master 的逻辑;
  • mr/worker.go 实现 Worker 的逻辑;
  • 用户程序会被编译成 go plugin,被框架加载然后调用。