MIT 6.824/6.5840 分布式系统 Lab 1: MapReduce
本次 Lab 1 讲的是分布式计算的 MapReduce。
#1. 背景
MapReduce 源自 Google 在 OSDI '04 发表的论文,论文的作者是 Jeff Dean 和 Sanjay。主要提出了一种简单、可拓展的分布式计算编程模型。
对于用户来说,只需要实现 Map()
和 Reduce()
两个函数,即可享受到分布式计算的好处,这就是 MapReduce 的设计理念。
#思想
MapReduce 需要用户将计算过程表达成以下两个函数:
Map(key, value) -> list(ikey, ivalue)
Reduce(ikey, list(ivalue)) -> list(ivalue)
到这里大家可能已经感受到一点并行的思想,其实 MapReduce 是把一个任务分解成一系列子任务,每个子任务由 Map
函数去执行。这些子任务的输出会被重组,然后分门别类送到 Reduce
函数去执行。
MapReduce 的 Motivation 是:
- 很多大数据计算任务都是大量输入对应到小量的输出;
- 分而治之,一个复杂计算任务往往可以分成多个更简单的部分;
- 子任务之间往往可以并行执行; ->
Map
怎么区分”多个更简单的部分”?使用ikey
标示子任务。
#2. MapReduce 程序示例
#Word Count
Word count任务是统计一批文本中每个词出现的数量
Map任务:统计一个分块中的词频
Reduce任务:合并所有分块的词频
#Grep
Map任务:对一个分块做grep
Reduce任务:合并所有的grep结果
#计算图的逆
已知所有的<source, target>
边,计算每个target
被哪些source
引用
Map任务:逆转一组边,得到<target, source>
Reduce任务:按target
合并所有的source
#分布式排序
Map任务:原样输出
Reduce任务:原样输出
#3. 框架实现
MapReduce 框架负责调度和执行用户编写的 Map 和 Reduce 任务。
根据环境(机器配置,网络拓扑,故障概率,和存储成本)不同,可以有很多不同的实现方案
#概览
框架首先将输入数据分成块,然后对每一块分别执行map
过程,这个过程是在多个机器上并行执行的;
在map
全部完成后,将中间结果按照某些策略分成块,然后并行执行reduce
过程;
#主控节点数据结构
- 任务状态:任务类型(
Map
orReduce
),执行状态(进行中 or 已完成),所在的机器 - 数据状态:中间结果的存储位置和大小
#分布式容错设计
Worker节点的保障:Master节点会周期性地ping Worker节点;失联Worker节点已完成的任务会重新调度。
Master节点的保障:Master节点会周期性做checkpoint方便故障时重新开始。
出错时的语义保证:需要实现Map/Reduce
任务提交是原子性的
Map
任务通过提交临时文件名,由master做改名实现。Reduce
任务完成时会把工作临时文件改名为目标文件。
#程序局部性
分析局部性主要是考虑到带宽资源的限制。
论文中实现的 MapReduce 框架是建立在 GFS (Google File System) 上的,GFS 会对文件进行分块然后保存三份副本。
作者在实现时考虑到了底层GFS的影响,会尽量调度任务到有副本或者距离副本近的机器上。
#任务划分粒度
首先,输入数据会被划分成 个分块调用 Map
,然后中间结果会被划分成 个分块调用 Reduce
。
理想情况下, 和 应该远远大于 Worker 机器数量:利于负载均衡、故障恢复、和任务均匀分发;
取值上界:受限于主控节点的Memory,空间复杂度=
实践经验: 影响中间结果文件的数量
Google经验:2,000台Worker机器;;
#备用任务
一般来说,MapReduce框架计算的整体速度受制于少数“掉队者”,掉队的原因有很多,例如机器故障;
当整个计算过程将要完成时,主控节点会将剩余的任务多安排一次(备用任务)。
#4. 一些细节优化
#分区函数
提供给用户的分区函数,方便开发用;
#数据处理顺序
MapReduce框架保证同一个分区内,数据是按照key有序处理的,方便用户通过key访问结果文件;
#合并器 Combiner
在一些任务中,中间结果划分不是均匀的,可能会导致大量中间结果分配到同一个Reduce
任务去,打满网络带宽。
例如Word count任务,会有大量的<the, 1>
记录;
为此MapReduce框架设计了一个可选的 Combiner
函数,在 Map
之后先对结果进行一次合并,再通过网络发出去;
#输入输出类型
MapReduce框架是支持多种输入文件格式,也支持用户自己实现一个reader
接口。
#副作用
有时候用户在处理数据时可能想要一些辅助文件,这些是通过统一的一层Application Writer
完成,这一层会处理原子更新文件(先写到临时文件然后再改名)之类的细节;
对于多文件的情况,没有提供原子的两阶段提交,这一点需要用户自己保证;
#跳过坏记录
有时候用户的Map
,Reduce
可能会由于bug崩溃,MapReduce框架提供了一种可选的执行模式,可以自动检测确定性的崩溃数据,然后跳过这些记录;
框架每次调度任务时会附带一个Sequence Number。每个Worker进程都会注册Signal Handler,捕获SegFault和BusError。当程序崩溃时,框架会给主控节点发一个"Last gasp"UDP数据包带上这个Seq Number。当框架检测到某一个记录崩溃超过一次,下次就会跳过。
#本地执行
为了方便用户Debug,框架提供了一种本地执行模式。
#状态信息
主控节点跑了一个HTTP服务,暴露内部的一些状态信息。
#计数器
框架提供了一个Counter对象,方便用户实现一些计数需求(可能是为了san check)
#5. 性能测试
#6. 实践经验
#大规模索引任务
收益巨大:
- 简化业务逻辑:索引代码更简单、更小、更容易理解,因为处理容错、分布和并行化的代码隐藏在 MapReduce 库中。
- 关注点分离:MapReduce 库的性能足够好,我们可以将概念上不相关的计算分开,而不是将它们混合在一起以避免对数据进行额外的传递。这使得更改索引过程变得容易。
- 简化横向拓展:机器、网络导致的问题被框架处理了,使得横向拓展更简单,只需要加机器即可。
#回到 Lab 1
Lab 1 是实现 MapReduce 框架的两个组件 Coordinator(Master)和 Worker。
- Master 负责分发任务给 Worker,会启动一个 RPC Server;
- Worker 从 Master 那里获取
Map
或Reduce
任务并执行。
整个 MapReduce 框架包括:
main/mrcoordinator.go
是 Master 的入口,会调用MakeCoordinator()
然后等待;main/mrworker.go
是 Worker 的入口,会加载用户程序(so库)获得Mapf
和Reducef
,然后调用 Worker;mr/coordinator.go
实现 Master 的逻辑;mr/worker.go
实现 Worker 的逻辑;- 用户程序会被编译成 go plugin,被框架加载然后调用。