分布式系统
优势
-
通过并行计算增加容量 (parallelism)
-
通过复制容忍故障 (fault tolerance)
-
匹配物理设备的分布 (physical)
-
通过隔离来实现安全 (security / isolated)
- 将安全的和不安全的计算放在不同机器上运行
- 系统间通过网络进行交互
挑战
-
分布式系统有许多部分组成,这些部分是同时运行的,会遇到各种并发编程和复杂交互带来的问题。因此需要某些机制在时间上进行控制(比如超时机制,熔断机制)。
-
多台计算机加网络会使故障原因也更加复杂
-
性能,n 台计算机并不能达到 n 倍的性能
主题
-
一致性
通用的基础设施需要有明确的行为。例如,"Get(k) 获取最近一次 Put(k,v) 的值"。
实现良好的行为是很难的! "复制" 的服务器很难保持一致。
-
性能
目标:可扩展的吞吐量
Nx 个服务器,通过并行的 CPU、磁盘、网络实现 Nx 个总吞吐量。随着 N 的增长,扩展会变得更加困难,负载不平衡。有些事情不会随着 N 的增加而加快,例如初始化、交互。
-
权衡
容错性、一致性和性能是敌人。实现容错性和一致性需要通信
-
发送数据到备份
-
检查数据是否是最新的。
-
通信通常很慢,而且不可扩展
-
许多设计只提供弱的一致性,以获得速度。
- 例如,Get() 并不*产生最新的 Put()! 对于应用程序的程序员来说,这是很痛苦的,但可能是一个很好的权衡。
我们会在一致性/性能中看到许多设计点。
-
-
实现
RPC、线程、并发控制。
Lab
-
Map-Reduce
-
Raft 解决容错性
-
使用 Raft 构建 K/V server,它可以被复制
-
Sharded K/V server 将有可复制能力的主备 K/V server 克隆到多个组中,并将之前的数据分割存储到这些组中,提高运行速度(每个组只存储自己对应的数据,组合起来就是一整份数据)。同时还要实现在不同的服务期间移动数据,保证不会丢失(数据分片到各个组中,各组的服务器内也会有主从复制)。
Map-Reduce
以一个 word-count 为例,如果集群要对上万的文件进行计算,GFS 会先寻找到文件的所在位置,然后直接在本机的 map-reduce 程序中运行,从而节约了大量的网络传输。
将按行存储转换为按列存储的过程,在论文中成为 shuffle
概述
背景:在多 TB 级数据集上进行多小时的计算,例如,建立搜索索引,或排序,或分析网络的结构,只有在有 1000 台计算机的情况下才实用。
但是应用不是由分布式系统专家编写的,它的总体目标是让非专业的程序员也能轻松使用,对于程序员来说,他只需要定义 Map 和 Reduce 函数 (通常是相当简单的同步代码). MR 管理并隐藏了分布式的所有细节!
一个 MapReduce 作业的抽象视图
输入 1 -> Map -> a,1 b,1
输入 2 -> Map -> b,1
输入 3 -> Map -> a,1 c,1
| | |
| | -> Reduce -> c,1
| -----> Reduce -> b,2
---------> Reduce -> a,2
-
- 输入文件(已经)被分成 M 个文件
-
- MR 对每个输入文件调用 Map(),产生一组 k2, v2 的 "中间" 数据,每个 Map() 调用都是一个 "任务"
-
- 当地图被 Reduce 时。MR 会收集给定 k2 的所有中间 v2。并将每个键和值传递给一个 Reduce 调用
-
- 最终输出是来自 Reduce() 的<k2,v3>对的集合。
以 wordcount 为例:
- Map(k, v) 将 v 分割成单词
- 对于每个词 w, emit(w, "1")
- Reduce(k, v_set)
- emit(len(v_set))
MapReduce 系统的优缺点
-
MapReduce 的扩展性很好。
N 个 worker 计算机(可能)让你获得 Nx 的吞吐量。Maps() 和 Reduce() 可以并行运行,因为它们不相互影响。因此,更多的计算机可以带来更多的吞吐量!
-
MapReduce 隐藏了很多细节。
- 发送应用代码到服务器
- 跟踪哪些任务已经完成
- 将中间数据从 Maps "洗" 到 Reduce 中去
- 平衡服务器上的负载
- 从故障中恢复。
-
MapReduce 限制了应用程序可以做的事。
- 没有互动或状态 (除了通过中间输出)
- 没有迭代
- 没有实时或流式处理
-
MapReduce 输入和输出都存储在 GFS 集群文件系统上
- MR 需要巨大的并行输入和输出的吞吐量。
- GFS 将文件分割到许多服务器上,以 64MB 为一个块。
- Map 并行读取
- Reduce 并行写入
- GFS 还将每个文件复制到 2 或 3 个服务器上
MR 的工作细节
-
MapReduce 需要一个协调器,将任务分配给 worker 并纪录进度。
协调器将 Map 任务分配给 worker,直到所有的 Map 完成。
- Map 将输出(或者说中间数据)写到本地磁盘上
- Map 通过哈希将输出分割到每个 Reduce 任务的一个文件中。
在所有 Map 完成后,协调器将 Reduce 任务分配给 worker
- 每个 Reduce 任务从(所有)Map worker 那里获取其中间输出。
- 每个 Reduce 任务在 GFS 上写入一个单独的输出文件
-
什么可能会限制性能?
CPU、内存、磁盘、网络?在 2004 年,论文作者受到了网络容量的限制。
MR 在网络上发送什么?
- Map 从 GFS 读取输入。
- Reduces 读取 Map 的中间输出。通常和输入一样大,例如用于排序。
- Reduces 写输出文件到 GFS。
在 MR 的 shuffle 过程中,一半的流量要经过根交换机。
论文的根交换机速度为 100 ~ 200 Gb/s,总共有 1800 台机器,所以每台机器可以分得 55 Gb/s。相比于磁盘或 RAM 的速度小得多。
-
MR 如何尽量减少网络的使用?
-
协调器试图在存储其输入的 GFS 服务器上原地运行每个 Map 任务。所有的计算机都会同时运行 GFS 和 MR worker, 所以 Map 的输入都会通过 GFS 在本地磁盘读取,而不是网络。
-
中间数据被分割为许多文件,每个文件都存储了许多 key. 文件数量比 key 要少得多,大文件传输的效率要更高
-
-
MR 如何处理负载均衡?
如果 N-1 个服务器必须等待 1 个慢速服务器完成,则是浪费和缓慢的。但有些任务可能确实比其他任务花的时间更长。
解决方法:比 worker 数量多得多的任务
- 协调器将新的任务分配给完成先前任务的 worker。
- 因此,没有一个任务大到可以支配完成时间(希望如此)。
- 因此,快的服务器会比慢的服务器做更多的任务,完成的时间也差不多。
-
MR 的容错性如何?
如果一个 worker 在 MP 任务中崩溃了怎么办? MR 会对程序员隐藏故障。
MR 不必从头开始重新运行整个工作,它只重新运行失败的 Map 和 Reduce。假设 MR 将一个 Map 运行了两次,一个 Reduce 看到了第一次运行的输出。另一个 Reduce 看到了第二次运行的输出?
正确性要求重新执行时产生完全相同的输出。所以 Map 和 Reduce 必须是纯确定性的函数。它们只允许看它们的参数/输入。没有状态,没有文件 I/O,没有交互,没有外部通信。
如果你想允许 non-functional 的 Map 或 Reduce 呢?worker 失败将重新执行整个工作。或者是回滚到某个全局检查点。
-
崩溃恢复的细节
-
一个 Map worker 崩溃了。
- 协调器注意到 worker 不再响应 ping
- 协调器知道哪些 Map 任务在该 worker 上运行
- 这些任务的中间输出现在已经丢失,必须重新运行
- 协调器通知其他 worker 运行这些任务
- 如果所有的 Reduce 任务都获取了中间数据,可以不重新运行。
-
一个 Reduce worker 崩溃了。
- 完成的任务是好的 -- 已经存储在 GFS 中,并且保存有副本。
- 协调器在其他 worker 上重新启动未完成的任务。
-
-
其他故障/问题。
-
如果协调者给两个 worker 分配了相同的 Map 任务怎么办?
= 也许协调器错误地认为一个 worker 死了。它将只告诉 Reduce worer 其中一个。
-
如果协调者给了两个 worker 同样的 Reduce() 任务怎么办?
- 他们都会试图在 GFS 上写下同一个输出文件!
- GFS 的原子重命名可以防止混合;一个完整的文件将是可见的。
-
如果一个 worker 非常慢 --"散兵游勇",怎么办?
- 也许是硬件它弱。
- 协调器启动最后几个任务的第二个副本。
-
如果一个 worker 由于硬件或软件损坏而计算出不正确的输出,怎么办?
- 太糟糕了!MR 假设 "故障停止" 的 CPU 和软件。
-
如果协调器崩溃了怎么办?
-
-
目前的状况?
- 影响力巨大(Hadoop, Spark, &c)。
- 可能谷歌已经不使用了。
- 被Flume/FlumeJava取代(见Chambers等人的论文)。
- GFS 被 Colossus(没有好的描述)和 BigTable 取代。
-
结论
MapReduce 使大集群计算流行起来。
- 不是最有效或最灵活的。
- 扩展性好。
- 易于编程
这些在实践中是很好的权衡。现在已经有了一些更高级的继承者。