6.824 Lecture 1 Introduction Notes & Paper Reading

Posted on 一 28 六月 2021 in course • 4 min read

1 写在前面

MIT 6.824 分布式系统 是一门研究生课程,主要关注的内容是分布式系统相关的抽象及实现技术,包含容错、复制、一致性等主题。方式上是通过研读分布式领域的经典论文,分析和讨论这些论文包含的系统实现来进行学习和理解分布式系统。

从课程的论文来看,偏向工程实践,包含了业界经典的分布式系统工程论文,比如来自谷歌的 MapReduce、GFS、Spanner,也包含了 Zookeeper、Spark、Memcached 等流行的开源分布式中间件,此外还有 Bitcoin 等相对新的分布式系统。

课程还包含了 4 个 Lab,引导学生从实现经典的 MapReduce 到实现分布式一致性算法 Raft 来进行容错的复制集群,最终会实现一个基于 Raft 的分片 kv 分布式存储系统。从 Lab 的设计来看,这门课程包含了不少的动手编码内容,有一定的挑战性。

对于本课程的学习,个人计划是按照课程的 Schedule 进行,阅读每节课相关的论文或者材料,然后观看每节课的课堂录像,并进行课堂笔记的记录,看完后再对笔记进行整理和补充个人的思考,加上个人对每节课论文的分析和阅读思考补充为每节课的总结文章。最新的课堂录像是 2020 年的课程,发布在 Youtube 上,国内可以在 B 站 找到。

2 概要

第一节课主要是对整个课程的介绍,对分布式系统要解决的问题和面临的挑战进行了概括,然后是对本课涉及到的论文 MapReduce 进行了解读。因为课程后面比较多的学生进行了提问,所以本课对 MapReduce 并没有进行完整的讨论,缺失的内容可以参考往年课堂完整的 Note。

本课相关的材料:

3 要点

3.1 为什么需要分布式系统?

  • 扩容: 通过并行提升系统的容量,比如多个服务器可以分散处理请求和数据存储,这里容量不同类型系统不一样,包括吞吐量和数据存储等;
  • 容错: 主要是通过复制来实现容错;
  • 物理分割: 一些系统为了靠近外部依赖或者服务的其他实体,物理上就存在分布的状态;
  • 安全隔离: 基于安全性,部分系统需要分布式实现;

3.2 分布式系统面临的挑战

  • 并发: 无论是处理请求的并发还是系统内部组件之间的并发交互,对于实现一个可靠分布式系统来说都是必须要去解决的问题;
  • 局部错误: 单台服务器或者电脑,发生硬件错误可能是一两年的频率,但是对于一个有着上千台服务器的大型集群来说,硬件错误每天都是必然的事件,网络、电源、磁盘,每天都可能会发生错误,一个可靠的分布式系统必须要良好应对硬件错误;
  • 性能: 系统的性能是否可以随着服务器数量线性提升?这是一种理想状态,实际上是很难实现,而且分布式的系统往往带来了更复杂的情况;

3.3 基础设施

本课程主要关注的是服务端基础设施类的软件系统:

  • 存储: 从数据存储到更底层的文件系统;
  • 通讯: 分布式系统组件之间的通讯网络协议;
  • 计算: 分布式的计算模型;

一个大主题: 抽象与简化分布式存储和计算基础设施的接口便于构建应用和对应用隐藏分布式系统的内部复杂性。这是个很困难的事情。

3.4 课程主题

  1. 实现
    • RPC: 对调用方隐藏实现是通过不可靠网络通讯得到的结果;
    • Threads: 多核、并发,简化实现操作;
    • Concurrency Control: 锁,处理竞态,保证正确性;
  2. 性能

    • 扩展性: 理想状态 → 系统性能可以随着硬件线性增长
    • 系统性能: 吞吐量、容量;
    • 一些性能无法通过增加机器数量提升: 单个用户的请求响应时间,所有用户同时更新同一个数据(涉及到数据竞争) ;
  3. 容错

  4. 单台服务器可以稳定运行很久;
  5. 服务器数量多的时候,错误不是随机或者罕见的事件,而是必然事件,总是会有机器出现问题;
  6. 分布式系统需要考虑容错性才能对应用隐藏系统的内部复杂性;
  7. 系统的可用性 Availability: 在错误发生时总是能对外提供正常的服务 ;
  8. 可恢复性 Recoverability: 无法应对的错误修复后,服务能正常恢复,尽可能保持错误发生前的状态;
  9. 方案

    • 持久存储 、非易失存储→ 硬盘,SSD,记录数据 checkoutpoint,服务恢复后读取数据恢复状态;
    • 复制集群: 数据复制到多台服务器上;
  10. 其他主题

  11. 一致性: 数据、状态 → 多副本情况下,因为缓存或者同步的问题,需要考虑数据的一致性
  12. 强一致性: 保证数据的一致性 → 从所有节点都可以看到最新的数据 → 可用性受影响 → 更多的数据通讯 → 异地,跨大洲的复制节点对强一致性有更大的性能损耗;
  13. 弱一致性: 最终一致;

3.5 MapReduce

意义和起源

  • 大量的数据: 数十 BT;
  • 数千台服务器;
  • 分布式的任务需要专家程序员写分布式的代码去分布处理任务;
  • 需要一个易用的框架,方便实现分布式任务,并对工程师隐藏分布式的复杂;

Map

  • 关注输入文件
  • 将文件分散为多个文件,多个 Map 任务
  • 输出处理的中间文件 → k/v

Reduce

  • 收集 Map 任务产生的中间文件
  • 按照规则聚合中间文件的数据
  • 输出聚合结果到最终文件: k → count

Map 和 Reduce 都是任务,N 个 Map 和 M 个 Reduce 可以分布到多台服务器上执行,可以达到 N 倍的性能提升。

Word Count: 计算文件中每个单词出现的数量

  • Map(k, v): k 是文件名,v 是文件的内容

    bash Map(k, v) split v into words for each word w emit(w, "1")

  • Reduce(k, v): k 是单词,v 是单词列表

    bash Reduce(k, v) emit(len(v))

一个真实的 MapReduce 工程是可能存在多个阶段的 Map/Reduce , 构成一个 pipeline 处理流,得到最终需要的结果。

计算模型特性

  • 纯函数无副作用、无外部依赖状态;
  • 计算模型需要可抽象为 Map/Reduce: 不支持无法抽象为 Map/Reduce 的计算任务;
  • 网络对当年的 MapReduce 存在很大的限制 → 50 M/s ;
  • 一些任务可能需要大量的数据复制: 比如排序任务,需要全量的数据进行移动;

谷歌中的 MapReduce 底层依赖 GFS 。

4 Paper

本课的论文是来自谷歌 2004 年发表的 MapReduce,这是一个当年在谷歌基础设施中被广泛应用于大数据处理任务的编程框架,工程师只需要定义好 Map 和 Reduce 两种函数,就可以利用实现好的框架库在数千台服务器中并行执行大数据处理任务。这篇论文和 GFS、BigTable 并称为谷歌大数据三大论文, 一起催生了谷歌大数据基础设施的开源版本 Hadoop 。Hadoop 成为今后十多年大数据领域占用绝对地位的基础设施,至今,Hadoop 生态不断发展,依旧是大数据相关业务基础设施的最佳选择。

4.1 Map/Reduce 计算模型

Map/Reduce 计算模型源自函数编程语言,是用于处理列表类型数据的高阶函数,支持传入一个函数和列表,输出对列表应用传入函数的结果。以 Common Lisp 为例,Map 函数定义及示例如下:

(map result-type function sequence1 sequence2...)

; 示例
(map 'vector #'* #(1 2 3 4 5) #(10 9 8 7 6)) ==> #(10 18 24 28 30)

Map 函数接受一个 N 参数的函数和 N 个序列,将 N 个序列同序号元素作为函数的参数,应用后将函数输出结果连接起来作为一个新的序列返回。如以上示例,vector 是返回的序列类型,* 是函数,#(1 2 3 4 5)#(10 9 8 7 6) 是输入的两个序列,执行结果是两个序列同序号元素应用函数乘 * 后的结果序列 #(10 18 24 28 30)

Reduce 函数定义及示例如下:

(reduce function sequence &key :from-end :start :end :initial-value)

; 示例
(reduce #'+ #(1 2 3 4 5 6 7 8 9 10)) ==> 55

Reduce 函数接受一个 2 参数的函数和一个序列,首先将该序列的前 2 个元素作为函数参数调用得到一个结果,然后将结果和后一个元素再次作为函数参数调用,依次一直到最后一次函数调用,得到最终的结果。如上示例,函数是加 +,示例表达式的效果是将序列里面的所有元素相加。

4.2 MapReduce 编程模型

在本论文中,MapReduce 的编程模型与 Map/Reduce 不太一样,计算任务被抽象为接收和产生 Key/Value 对的 Map 和 Reduce 函数,并且由用户根据计算定义提供给 MapReduce 框架进行执行。Map 函数接受输入的 Key/Value 对,然后产生中间 Key/Value 对,MapReduce 库将中间数据相同 Key 的 Value 数据聚合起来,再交给 Reduce 函数计算,然后输出最终的计算结果 Key/Value 对。简化表达如下:

map(k1,v1) → list(k2,v2)
reduce(k2,list(v2)) → list(v2)

论文中举了一个计算文件中每个单词出现数量的计算任务:

map(String key, String value): 
  // key: document name
  // value: document contents
  for each word w in value:
    EmitIntermediate(w, "1");

reduce(String key, Iterator values): 
  // key: a word
  // values: a list of counts
  int result = 0;
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(result));
  • map 函数的参数 Key 是文件名,Value 是文件内容,函数体就是对文件内容 value 的每个单词直接输出一个 key/value 对到中间文件,key 是该单词,value 是 "1" 表示该单词出现了一次。
  • reduce 函数的参数 Key 是某个单词,Values 是中间文件中该 Key 的所有 value 列表,也就是一堆的 "1" 数据。函数体就是对将 Values 列表的数据转换为 Int 数值,然后加起来,最终输出一个结果。

从上面的编程模型来看,MapReduce 对于计算任务类型是有一定的要求的,需要能够将计算抽象为 Map 和 Reduce,并且应当是无副作用的。这样,才可以把大量数据的计算任务拆分为并行的处理任务分发到大量的服务器上进行计算。

4.3 架构与流程

mapreduce

上图是 MapReduce 计算任务整体的一个架构和执行流程,MapReduce 是以一个库的形式存在,用户程序加载 MapReduce 库然后拷贝到整个集群的所有服务器上。在不同的服务器上,程序有 Master 和 Worker 两种运行模式,其中 Master 负责和其他 Worker 程序交互分发 Map 或者 Reduce 任务及记录状态等元数据。而 Worker 则分布在大量的服务器上分别执行用户程序定义的 Map 任务或者 Reduce 任务。

在一个计算任务启动时,MapReduce 库会将任务数据分割为 M 个小文件,大小一般从 16 M 到 64 M ,这个主要是与底层依赖的 GFS 特性相关。

Master 主要保存以下元数据:

  • 每个 Map 或者 Reduce 任务的状态: idle, in-process, completed ;
  • 每个 Worker 节点的唯一标识;
  • 每个已完成的 Map 任务,保存其产生的 R 个中间文件的位置和大小,用于分发 Reduce 任务;

Worker 根据被分发的任务类型会有不同的执行:

  • Map 任务:
    1. 读取输入的文件片段内容,解析得到键值对数据,并且将每组键值对数据传给用户定义的 Map 函数执行,然后将产生的中间结果键值对数据缓存在内存中;
    2. 缓存在内存中的数据将会被定时写到本地磁盘中,并且根据用户定义的分片函数,将数据写到本地磁盘上 R 个文件中,然后再把这些文件的位置信息传回给 Master 节点;
  • Reduce 任务:
    1. 启动后会根据传入的中间文件位置,通过远程调用的方式读取 Map 任务的本地文件所有内容,然后将所有键值对数据按照 Key 排序,并且同一个 Key 的数据聚合在一起;
    2. 聚合好的 Key 和 Value 列表将会被传给用户定义的 Reduce 函数执行,结果将会被追加写到这个 Reduce 任务的最终输出文件;
    3. 当中间数据文件内容过大内存无法容纳时,将会采用外排的方式进行处理;

4.4 容错机制

MapReduce 在谷歌中是设计来运行在成百上千的普通服务器中处理大量的数据的,错误是必然会发生的事情,MapReduce 需要有相关的容错机制来应对各种可能发生的错误。论文中提及了几种错误情况的应对,主要是从整个 MapReduce 中的各个角色来进行讨论的。

4.4.1 Worker 失效

Worker 失效是由 Master 负责处理的,Master 会定时 ping 每个 Worker 来保持状态。当一个 Worker 超时未响应时,Master 就会将该 Worker 标记为失效状态,并对该 Worker 执行的任务进行如下处理:

  1. 该 Worker 完成的 Map 任务将会被重置为空闲状态,由 Master 再调度其他 Worker 执行;
  2. 该 Worker 进行中的 Map 任务和 Reduce 任务也会被重置为空闲状态;

已完成的 Map 任务需要重新执行是因为考虑到 Map 任务产生的中间文件是保存在 Worker 本地磁盘上的,所以如果该 Worker 失效,并不能确保数据还是有效的。而已完成的 Reduce 任务不重新执行是因为 Reduce 任务的输出结果是保存在全局的文件系统 (GFS) 中的,所以不需要再重新执行。

4.4.2 Master 失效

MapReduce 中 Master 是单点的,对于 Master 失效的容错处理方案,论文中采用的是定时将 Master 节点的数据写下来,在 Master 挂掉之后,启动一个新的 Master 节点,然后读取上个检测点数据恢复服务。

4.4.3 失效处理机制

当用户提供的 Map 和 Reduce 函数是确定性函数时,MapReduce 框架需要保证重复执行时,函数的输出都是一致的,就好像整个程序没有发生错误一样。在 MapReduce 中,主要是通过对 Map 和 Reduce 任务的输出内容进行原子提交来实现这个特性。

首先每个进行中的任务都会将其输出写到一个私有的临时文件,Reduce 任务会产生一个这样的文件,而 Map 任务则会产生 R 个,R 与 Reduce 任务数量一致。不同任务完成后的处理不一样:

  • Map 任务完成后,会将 R 个临时文件的名字发生给 Master ,由 Master 记录下来作为任务的状态数据,已完成的 Map 任务发送的消息将会被 Master 忽略;
  • Reduce 任务完成后,Worker 会将临时文件重命名为最终输出的文件名称,对于 Reduce 任务重复执行在多个机器的情况,MapReduce 框架主要是依赖底层的 GFS 来保证文件重命名操作的原子性;

4.5 局部性

在谷歌当时的计算集群中,网络带宽是一个相对受限的资源,所有数据在计算时都通过网络进行传输会导致网络带宽成为系统的瓶颈。 MapReduce 的优化方案比较巧妙,主要是通过尽量让数据文件和执行任务在同样的机器上,减少需要通过网络传输的数据数量来解决。具体是:

  • 首先输入数据主要是通过 GFS 管理,大文件会被分割为 64 MB 的小文件,并且每个小文件都会有多个拷贝,通常是 3 拷贝;
  • Master 节点会记录每个小文件的位置信息,并且作为调度 Map 任务参考依据,尽量将 Map 任务调度到存储了该文件拷贝数据的服务器上执行;
  • 如果 Map 任务无法调度到存储了该文件数据服务器上执行,则尝试会将任务调度到一个接近存储了该文件任何一份拷贝数据的服务器上执行,比如同一个网络交换机下的服务器;

整体的考虑是,尽量不需要进行数据传输,如果无法达成,则降低数据传输的成本。

4.6 任务粒度

MapReduce 的一大设计思路是将一个大数据的处理任务分割为大量的小任务,让大量的服务器来并行处理,以达到加速计算的目的,这也是我们在算法中常见的 divide and conquer 方法。在实践中,任务的粒度也是需要考虑的内容,论文中对此进行了相关的论述。

以 Map 阶段的数量为 M,Reduce 阶段的数量为 R,理想状态下,M 和 R 的值应该远远大于 Worker 服务器的数量,这样才方便对 Worker 执行任务动态达到均衡的效果,并且在出现 Worker 节点失效的情况下,也可以加速恢复。

在 MapReduce 中,因为 Map 和 Reduce 任务的状态等信息都存在 Master 节点的内存中,所以实际上根据 Master 节点的硬件资源是存在一个上限的。在谷歌的实践中,通常是 200000 个 Map 任务和 5000 个 Reduce 任务,运行在 2000 台 Worker 服务器上。

4.7 任务备份

一个完整的 MapReduce 计算任务需要所有切分的 Map 和 Reduce 任务全部完成才结束。在实践中,常见的一个导致 MapReduce 任务执行时间过长的情况是某个机器上执行的一些 Map 或者 Reduce 任务卡住了,导致任务一直无法完成。比如磁盘出现异常的服务器可能会导致磁盘读取性能大幅度下降,影响到了任务的执行。

MapReduce 中设计了一种任务备份机制来降低这种异常的影响。主要实现是,当 MapReduce 计算操作接近完成时,对于当前还在执行中的 Map/Reduce 任务,Master 节点会对应调度一个备份的任务执行。原始的任务和备份的任务中,只要有一个执行完毕,Master 就会将该任务标记为完成。

任务备份机制的关键在于开始备份任务重新执行的阈值,这个根据不同的计算任务特性,应该有不同的具体值。此外,考虑到备份执行任务会导致计算资源的使用增加,所以需要在资源增加和计算加速之间取个平衡点。

4.8 优化扩展

除了以上提及的具体实现之外,MapReduce 同时还存在着一些特殊的优化扩展点,论文中也提及了不少,值得参考。

4.8.1 分片函数

在使用 MapReduce 时,通常是由用户来指定想要的 Reduce 任务和输出文件数量,数据通过使用一个针对 Map 产生的中间文件的 Key 进行分片的函数来进行分片处理。默认的分片函数是 hash(key) mod R ,这个函数可以产生相对均衡的分片结果。但是在实际应用中,不同的任务类型可能会对分片有不同的一个实际需求,比如对于 URL 的 Key,通常我们希望同一个 Host 的结果会到同一个文件中。MapReduce 库中提供了一个特殊的分片函数来支持这个特性,比如 hash(Hostname(urlkey)) mod R 可以满足刚刚提到的那个需求。

4.8.2 顺序保证

MapReduce 保证在单个分片中,中间内容 Key/Value 对是以 Key 的升序排序处理的。这个顺序保证方便生成每个分片有序的输出文件,方便实现支持高效的随机查找 Key。

4.8.3 组合函数

在一些场景中,Map 任务产生的中间 Key/Value 数据可能会存在比较大的重复性,比如计算单词出现次数的任务,初始实现是每个单词输出一个 的数据,同一个任务对于同一个单词会产生大量这样的键值对数据。而每个键值对数据都需要通过网络传输到单个 Reduce 任务进行处理。

在 MapReduce 中,对于这种情况,框架支持用户指定一个可选的组合函数来在数据被通过网络发送前对同样的 Key 进行局部的数据合并处理。组合函数是在每个执行 Map 任务的机器上执行,通常代码实现和用户的 Reduce 函数类似,区别在于 MapReduce 对执行结果处理方式。Reduce 函数的输出会写到一个最终输出文件中,而组合函数的输出则是写到一个将要发生给 Reduce 任务的中间文件。

4.8.4 输入输出类型

MapReduce 中支持不同的输入输出类型,提供了相关 reader 接口来支持从文本到用户自定义的类型。数据不一定来自文件,也可以来自数据库或者其他内存数据,只需要实现对应的 reader 就可以了。输出也类似,有不同的输出类型支持,也支持用户自定义输出类型。

4.8.5 副作用

有时候用户可能会发现在 Map/Reduce 执行中输出一些临时辅助性的文件比较方便有用,这样 Map/Reduce 操作就是包含副作用的。MapReduce 依赖应用的 Writer 来保证这些副作用的原子性和幂等。通常应用会将内容写到一个临时文件,在完全生成后原子地重命名临时文件。

MapReduce 对单个任务产生多个输出文件支持二段提交来实现写文件的原子性,所以这种任务需要输出是确定性的,多次执行不会产生变化。

4.8.6 跳过坏记录

有时候用户的代码中可能会存在 bug 导致任务执行时处理特定的记录会必然崩溃,导致任务无法完成,如果这些 bug 是第三方库导致的也不好直接修复。对于一些可以允许跳过一些记录不对整体计算产生太大影响的任务来说,MapReduce 支持提供一个可选的执行模式,由 MapReduce 检测这些比如导致执行崩溃的记录,然后在下次执行时跳过这些记录继续执行。

实现上,每个 Worker 进程都会注册一个信号处理捕获内存段异常和 bug 错误信息。在 Worker 执行 Map/Reduce 操作之前,MapReduce 会存储操作参数的序列号到一个全局变量中。当执行出现异常崩溃时,信号处理器会发送这个参数的序列号到 Master 节点。当 Master 发现某个特定的记录出现错误超过一次时,就会在下一次重新执行时指示该记录应该被跳过。

4.8.7 本地执行

一个跑在数千台服务器上并行执行的 MapReduce 任务是非常难以调试的。为了方便调试,谷歌实现了一个在单台机器上顺序执行任务函数的 MapReduce 库版本,用户可以自行控制特定 Map 任务的执行。用户通过一个特殊的标记启动程序,就可以使用场景的调试和测试工具对任务进行处理。

4.8.8 状态信息

Master 节点运行了一个内部的 HTTP 服务器,暴露了一系列的状态页面提供给管理员查看。包含以下内容:

  1. MapReduce 计算的进度: 完成和进行中状态的任务数量、输入数据大小、中间数据的大小,输出数据的大小、处理速率等等;
  2. 到标准错误信息的连接及每个任务输出的标准输出文件;
  3. 失败的 Worker 节点,失败的 Map/Reduce 任务信息;

4.8.9 计数器

在计算任务执行过程中,基于统计等需求,总是需要对一些事件或者情况发生的次数进行计数处理。MapReduce 中提供了一个计数器的机制,用户可以在用户代码中创建一个 Counter,并在 Map 等任务处理中根据具体业务增加 Counter 值。MapReduce 框架会从 Worker 节点定时把某个任务的 Counter 信息在 ping 响应时汇报给 Master 节点,当一个任务执行完毕时,Master 节点会聚合计数器信息返回给用户代码。同时,Master 节点针对重复执行的任务汇报的计数器信息也会进行过滤处理,避免同个任务多次计数。计数器信息也会展示在 MapReduce 的状态页面上。

5 总结

本课主要还是针对分布式系统做了一个概括性的介绍,包含了什么是分布式系统,为什么需要分布式系统以及在当前,分布式系统存在那些挑战,我们整个课程关注的是分布式系统中哪些内容。通过本课的学习,基本能对分布式系统的领域及问题有一个初步的了解。

MapReduce 论文是一篇相对旧的论文,也是一篇非常经典的分布式系统方面的论文。从论文里面,我们可以看到,基于一个简单的模型,加上对问题域的简化,我们可以利用分布式系统来解决一个传统意义上单机非常难以解决的问题。论文中除了系统的架构和模型值得我们去关注之外,整个系统对于容错、恢复处理等的机制,也是很值得我们去参考的。在当前的业界中,这些思想仍然具备很大的价值。