跳到主要内容

· 阅读需 16 分钟

原文链接:https://lucumr.pocoo.org/2021/11/14/abusing-serde/

翻译:trdthg

选题:trdthg

本文由 Rustt 翻译,StudyRust 荣誉推出

Rust 冒险:滥用 Serde

当你让一个 Rust 程序员指出自己最喜欢的东西时,他们会很快的指出 serde 是一个让工作愉快好帮手。serde 是一个 Rust 的序列化和反序列化框架。它的格式相对独立,可以让你处理 JSON,YAML 以及一系列不同的格式。

除了上面的之外,还有很多东西可以用 serve 完成。我认为有一些用例相当有趣,值得分享。

滥用序列化

Abusing Serialization

其中一个有趣的用例是用 serde 作为某种形式的反射框架,将结构体暴露给其他的不能原生支持 Rust 结构体的环境。在这些情况下,作为一个开发者,你序列化了一个可以被序列化的对象,接着立即以某种稍微不同的格式再次反序列化它。相比于反序列化,我们也可以自定义一个序列化器用来 '捕获' 序列化的调用。这是在 IPC,模板引擎上下文、格式转换中常用的模式。

这在实践中大概是什么样呢?让我们从用户的角度看一下我写的 MiniJinja 模板引擎。MiniJinja 使用 serde 作为核心数据模型,将结构化的数据传递给模板,以便它们可以在运行时进行评估。下面是一些给开发者的示例代码:

#[derive(Serialize, Debug)]
pub struct User {
name: String,
}

fn main() {
let mut env = Environment::new();
env.add_template("hello.txt", "Hello {{ user.name }}!")
.unwrap();
let template = env.get_template("hello.txt").unwrap();
let user = User {
name: "John".into(),
};
println!("{}", template.render(context!(user)).unwrap());
}

如你所见,我们定义了一个叫 User 的结构体,可以使用默认的 Serialize 实现将它序列化。这个对象接着被传递到 context!()context!() 所做的就是创建了一个 map,然后将一个键设为 user,接着设置为该变量的值。这样做的目的是允许模板引擎访问到 user 的 '属性',例如 name。Rust 不是动态语言,这意味着通常在运行时做这样的事情是不可能的。但是由于 serde 为 User 实现了 Seralize,我们可以这样做。具体的实现大致如下(伪代码):

impl Serialize for User {
fn serialize(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer
{
let s = serializer.serialize_struct("User", 1);
s.serialize_field("name", &self.name)?;
s.end()
}
}

正常情况下,serializer 是一个类似于 JSON 序列化器的东西,它可以将结构体写入到一个字符串或者是文件,在这个过程中把它编码为 JSON。但是 serde 提供的接口并不要求用户必须这样。实际上,MiniJinja 直接将结构体编码为一个内存中的结构,模板引擎可以解析它。

这种模式并不新颖,serde 本身其实也有使用。当你使用 serde 的 flatter 功能时,serde 会启用一个内部缓冲模式,数据会被存储在一个内部的 Context 类型中,Context 类型可以表示 serde 数据模型的全部内容。然后这个 context 可以被传递给另一个序列化器中。

我不仅在 MiniJinja,同时也在 insta (一个快照测试工具)使用到这种模式。为了避免由于非确定性数据导致的测试快照的不稳定性,我首先将其序列化为一种内部的格式,接着在该格式上进行一个再加工,最后再将其序列化为最终的格式(例如 YAML)。

TLS 恶作剧

TLS Shenanigans

TLS:Thread Local Storage,Shenanigans:恶作剧

然而,MiniJinja 在此处使用 serde 的有趣之处在于,它允许在序列化和序列化器之间传递不兼容的数据。如前所述,serde 有一个特定的数据模型,不符合该数据模型的东西都会遇到这个问题。例如,serde 可以编码的最大整型是 i128。如果你需要一个任意精度的整型,那就不走运了。但是还是有办法的,你可以使用 带内信令(in-band signalling)传递额外数据。例如,serde JSON 序列化器能够表示任意精度整型,因为它在单值对象中保留了一个特殊的键,并用它去指示 JSON 序列化 / 反序列化器组合,决定这个任意精度的整型是否要被序列化。它看起来像这样:

{ "$serde_json::private::Number": "value" }

但是你应该能发现,如果一个人给出了这样的 JSON 文档,serde JSON 会把它当作任意精度的整形去解析,这意味着 'value' 部分本身也需要于 serde 兼容。对于任意精度的整型,这没有问题,因为它可以用字符串表示。但是假如你想在序列化和反序列化中传递的东西根本不能序列化呢?

这时,巧妙地利用 thread local 就是一种变通方法。

在 MiniJinja 中,运行时值的内部表示是一个叫做 Value 的结构体。正如你所期望的,它可以容纳整型,浮点数,字符串,列表,对象等等。然而,他也可以容纳一些 serde 完全无法解析的类型。特别是它可以保存一种特殊类型的字符串,称为 'safe' string, 它是一个存储了安全的 HTML 代码的字符串,不需要转义,也不需要所谓的 '动态值'。后者特别有趣,因为它不能被序列化。

什么是动态值?它实际上是具有状态的对象的句柄,应该直接传递给模板。这里的一个例子是 MiniJinja 中的 loop 变量:

<ul>
{% for item in seq %}
<li>{{ loop.index }}: {{ item }}</li>
{% endfor %}
</ul>

MiniJinja(类似于 Jinja2)提供了一个特殊的 loop 变量可以访问循环的状态。例如,你可以通过 loop.index 来获取当前循环的迭代次数。在 MiniJinja 的工作原理中,'循环控制器' 本身会被直接传递给模板,并且把值本身当作引用计数存进去。

pub struct LoopState {
len: AtomicUsize,
idx: AtomicUsize,
}

let controller = Rc::new(LoopState {
idx: AtomicUsize::new(!0usize),
len: AtomicUsize::new(len),
});

当循环迭代时,控制器上的索引会 +1。

controller.idx.fetch_add(1, Ordering::Relaxed);

控制器本身会被直接添加到上下文中:

let template_side_controller = Value::from_object(controller);

为了达到这个目的,控制器需要实现 MiniJinja 内部的 Object 特征,下面是一个最小实现:

impl Object for LoopState {
fn attributes(&self) -> &[&str] {
&["index", "length"][..]
}

fn get_attr(&self, name: &str) -> Option<Value> {
let idx = self.idx.load(Ordering::Relaxed) as u64;
let len = self.len.load(Ordering::Relaxed) as u64;
match name {
"index" => Some(Value::from(idx + 1)),
"length" => Some(Value::from(len)),
_ => None,
}
}
}

在模板引擎那一边,系统知道当 index 属性被使用时,需要调用 get_attr() 方法。

到目前为止我们所说的都是理论,serde 究竟是如何做的呢?当 Value::from_object 调用时,传入的值会被 move 到 value 对象里。这样做不需要特殊处理,特别是由于已经使用了引用计数。但是现在的问题是,对于像 LoopState 这样本身没有实现 Serialize 的东西,它的值是如何被序列化的?答案是线程本地存储(thread local storage)和一个合作的(co-operating)序列化和反序列化器。

越过边界的 State

Out of Bound State

隐藏在 MiniJinja 的 Value 实现有这样一段代码:

const VALUE_HANDLE_MARKER: &str = "\x01__minijinja_ValueHandle";
thread_local! {
static INTERNAL_SERIALIZATION: AtomicBool = AtomicBool::new(false);
static LAST_VALUE_HANDLE: AtomicUsize = AtomicUsize::new(0);
static VALUE_HANDLES: RefCell<BTreeMap<usize, Value>> = RefCell::new(BTreeMap::new());
}

fn in_internal_serialization() -> bool {
INTERNAL_SERIALIZATION.with(|flag| flag.load(atomic::Ordering::Relaxed))
}

它们的用处是,Value 自身能够感知到什么时候使用内部序列化的特殊形式。这种内部序列化是一种特殊形式的序列化,我们明确知道我们的序列化数据的接收者是一个可以理解该数据的反序列化器。我们没有直接对数据进行序列化,而是将其存入到 TLS 中,然后把数据的句柄序列化到 serde 序列化器中。反序列化器会先反序列化句柄,接着再从 TLS 中提取值。

因此,我们的循环控制器序列化的实现大致如下:

impl Serialize for Value {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// enable round tripping of values
if in_internal_serialization() {
use serde::ser::SerializeStruct;
let handle = LAST_VALUE_HANDLE.with(|x| x.fetch_add(1, atomic::Ordering::Relaxed));
VALUE_HANDLES.with(|handles| handles.borrow_mut().insert(handle, self.clone()));
let mut s = serializer.serialize_struct(VALUE_HANDLE_MARKER, 1)?;
s.serialize_field("handle", &handle)?;
return s.end();
}

// ... here follows implementation for serializing to JSON etc.
}
}

如果它被序列化为 JSON,我们大致能看到这样的东西:

{ "\u0001__minijinja_ValueHandle": 1 }

而真正的循环控制器将被存储在 VALUE_HANDLES 中句柄为 1 处。现在我们如何从里面的到数值呢?在 MiniJinja 中,反序列化其实从未发生,只有序列化。而且序列化也只是将内存中的对象组装起来。因此,我们只需要让序列化器理解带内信令如何处理,并以此找到带外的值。

impl ser::SerializeStruct for SerializeStruct {
type Ok = Value;
type Error = Error;

fn serialize_field<T: ?Sized>(&mut self, key: &'static str, value: &T) -> Result<(), Error>
where
T: Serialize,
{
let value = value.serialize(ValueSerializer)?;
self.fields.insert(key, value);
Ok(())
}

fn end(self) -> Result<Value, Error> {
match self.name {
VALUE_HANDLE_MARKER => {
let handle_id = self.fields["handle"].as_usize();
Ok(VALUE_HANDLES.with(|handles| {
let mut handles = handles.borrow_mut();
handles
.remove(&handle_id)
.expect("value handle not in registry")
}))
}
_ => /* regular struct code */
}
}
}

Ser-to-De

上面的例子是你可以滥用的一种方式,但是同样的模式在真实的序列化和反序列化中也可以用到。在 MiniJinja 中,我可以不使用序列化,因为我有效地利用了序列化代码,从一种内存格式转换到另一种内存格式。如果你想在进程间传递数据,情况就会变得棘手一些,实际的序列化就是必要的。例如,你想建立一个 IPC 系统,在进程之间交换数据,这里的挑战是,出于性能的考虑,对于比较大的内存段,你必须使用共享内存,或者是以文件描述符的形式传递打开的文件(因为这些文件有可能是 socket)。在我的实验性 unix-ipc crate 中,我就是这样做的。

我在这里建立了一个二级缓冲区,它可以放置文件描述符。同样,这里必须使用 TLS。

API 大致如下:

pub fn serialize<S: Serialize>(s: S) -> io::Result<(Vec<u8>, Vec<RawFd>)> {
let mut fds = Vec::new();
let mut out = Vec::new();
enter_ipc_mode(|| bincode::serialize_into(&mut out, &s), &mut fds)
.map_err(bincode_to_io_error)?;
Ok((out, fds))
}

从用户的角度来看,这些都是透明的。当一个 Serailize 实现遇到了一个文件对象时,它可以检查是否应该使用 IPC 的序列化,如果是,它可以把 FD 存起来,enter_ipc_mode 基本上将 fds 绑定到了一个线程局部变量里,接着调用 register_fd 注册它。例如,下面展示了内部句柄的序列化方式:

impl<F: IntoRawFd> Serialize for Handle<F> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
if is_ipc_mode() {
// effectively a weird version of `into_raw_fd` that does
// consume
let fd = self.extract_raw_fd();
let idx = register_fd(fd);
idx.serialize(serializer)
} else {
Err(ser::Error::custom("can only serialize in ipc mode"))
}
}
}

然后是反序列化:

impl<'de, F: FromRawFd + IntoRawFd> Deserialize<'de> for Handle<F> {
fn deserialize<D>(deserializer: D) -> Result<Handle<F>, D::Error>
where
D: de::Deserializer<'de>,
{
if is_ipc_mode() {
let idx = u32::deserialize(deserializer)?;
let fd = lookup_fd(idx).ok_or_else(|| de::Error::custom("fd not found in mapping"))?;
unsafe { Ok(Handle(Mutex::new(Some(FromRawFd::from_raw_fd(fd))))) }
} else {
Err(de::Error::custom("can only deserialize in ipc mode"))
}
}
}

从用户的角度来看,他只需要通过 IPC channel 传递一个 Handle::new(my_file) 就能实现。

Serde 的现状

State of Serde

不幸的是,上面所有的东西都依赖线程本地变量和对内信令。整体上都不是很好,如果有一天出了 serde 2.0,我希望有更好的方法实现上面的内容。

实际上,现在的 serde 仍然有不少问题和上述的 Hack 行为相关。

说到这里,在我们需要重写 serde 之前,肯定还有进一步可以被滥用的地方。但是现在是时候应该慢慢考虑 serve 未来版本的设想了它应该对数据模型的支持更友好,可以用更少的 Hack 来脱离规定框架。

· 阅读需 13 分钟

原文链接:https://lucumr.pocoo.org/2022/1/30/unsafe-rust/

翻译:trdthg

选题:trdthg

本文由 Rustt 翻译,StudyRust 荣誉推出

未初始化内存:unsafe Rust 太难了

Rust 在很多意义上不仅仅是一个现代的系统编程语言,也是一个实用的语言。它承诺了自己的安全性,并且提供了一个完整的框架,使得创建安全的抽象成为可能,同时运行时开销很小甚至为 0。你可以使用 unsafe 来明确的脱离安全的 Rust。

如果你之前看过这篇文章,你会惊讶的发现,它和之前的版本大不相同。这篇文章的作者是被 unsafe 的规则所困惑的受害者。我在文章中增加了一个例子,用来更好的展示其中的陷阱。我之前在 Twitter 上说过,编写 unsafe Rust 比 C / C++ 更困难,所以我想为我的观点作出一些解释。

从 C 到 Rust

我们从下面的例子开始:我们有一个待初始化的结构体。比较有趣的字段是 name。它是一个指针,指向一个已经分配好的字符串。除此之外,分配到哪里对我们并不重要,因此我们可以将这个结构体自身分配在栈上。我们的想法是,当这个结构体被初始化之后,它就可以被安全的传递和打印。

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>

struct role {
char *name;
bool disabled;
int flag;
};

int main() {
struct role r;
r.name = strdup("basic");
r.flag = 1;
r.disabled = false;
printf("%s (%d, %s)\n", r.name, r.flag, r.disabled ? "true" : "false");
free(r.name);
}

接下来我们用 Rust 去实现上面的代码。现在我们并不需要过多的关注 Rust 文档,只需要专注于一对一翻译即可。在你阅读下面的代码之前还有一点要注意:我们正在有意的创建一个对 Rust 程序员更熟悉的对象,并且可以被看作公共 API。所以我们在这里直接使用 String,而不是 C 语言的字符串。

use std::mem;

struct Role {
name: String,
disabled: bool,
flag: u32,
}

fn main() {
let role = unsafe {
let mut role: Role = mem::zeroed();
role.name = "basic".to_string();
role.flag = 1;
role.disabled = false;
role
};

println!("{} ({}, {})", role.name, role.flag, role.disabled);
}

看到这里,立即就有人想问,这里为什么需要 unsafe?当然了,你的确不需要。但是这段代码使用了一个函数:std::mem::zeroed。如果你尝试在最近的 Rust 编译器运行,应该会的得到这个错误:

thread 'main' panicked at 'attempted to zero-initialize type `Role`,
which is invalid', src/main.rs:11:30

老版本的编译器能够正常运行,但是那其实也是错误的。怎么解决呢?编译器又一次告诉我们解决之法:

warning: the type `Role` does not permit zero-initialization
--> src/main.rs:11:30
|
11 | let mut role: Role = mem::zeroed();
| ^^^^^^^^^^^^^
| |
| this code causes undefined behavior when executed
| help: use `MaybeUninit<T>` instead, and only call
| `assume_init` after initialization is done
|

为什么 Role 类型不支持使用 0 初始化呢?我们需要改动那些代码?我们能不能不初始化?

有人可能会想,使用 #[repr(C)] 强制结构体使用 C 语言的内存布局,但是这不能解决问题。正如编译器给出的建议,我们需要 MaybeUninit

use std::mem::MaybeUninit;

struct Role {
name: String,
disabled: bool,
flag: u32,
}

fn main() {
let role = unsafe {
let mut uninit = MaybeUninit::<Role>::zeroed();
let role = uninit.as_mut_ptr();
(*role).name = "basic".to_string();
(*role).flag = 1;
(*role).disabled = false;
uninit.assume_init()
};

println!("{} ({}, {})", role.name, role.flag, role.disabled);
}

zeroed 换为 MaybeUninit::zeroed 之后,一切都变了。现在我们不能直接使用结构体,而是要操作一个裸指针。由于裸指针没有实现 deref,并且 Rust 中没有 -> 操作符,我们需要手动解引用,并用这种笨拙的语法分配每一个字段。

首先:这样做可行吗?答案是肯定的。但是它正确吗?不正确。

答案在于,任何像可变引用(&mut)或者是栈上的值本身这样的构造,在 unsafe 代码之外仍然需要一直处于有效的状态。zeroed 返回一个值为 0 的结构,我们不能保证它可以有效的表示结构体或者任何其中的字段。在我们的例子中,我们的字符串在所有内容被清零的情况下是有效的,但是这并不能保证,而且是未定义行为。

需要注意的一点是,一个可变引用永远不能指向一个无效的对象,所以在对象的所有字段都被初始化之前,下面的操作是错误的:

let role = &mut *uninit.as_mut_ptr()

所以,让我们把 zeroed 改为 uninit。如果我们再次运行,程序就会崩溃。

// let mut uninit = MaybeUninit::<Role>::uninit();
free(): invalid pointer

为什么会崩溃呢?答案是,通过给 name 赋值一个新的字符串,我们也 drop 了之前的旧字符串。我们之前只是碰巧没有遇到这种情况,因为 Drop 碰巧能够处理一个被清零的字符串。但现在,我们深入了未定义行为。我们如何解决这个问题呢?我们需要以某种方式直接将字符串写到那里的指针。

我们首先要接受 MaybeUninit 是必要的,现在我们要处理这里的裸指针。这有些麻烦,但是看起来不是特别难。现在我们有两个新问题:我们知道 &mut X 是不允许的,但是 *mut X 是允许的。我们如何在不使用 &mut X 的情况下得到一个 *mut X? 讽刺的是,在 Rust 1.51 之前,再不打破任何规则之前,这是不可能的,但是现在,你可以使用 addr_of_mut! 宏。

let name_ptr = std::ptr::addr_of_mut!((*role).name);

太棒了,现在我们拿到了 name 的指针,如何写入呢?我们可以使用 write 方法。

addr_of_mut!((*role).name).write("basic".to_string());

现在完成了吗?还记得我们是如何使用普通结构体的吗?如果阅读一下文档,你就会发现,结构体的内存布局没有任何保证。事实表明,尽管目前的文档是这样说的,但是我们可以依靠字段的对齐性。如果我们处理的是 #[repr(packed)],我们就必须使用 write_unaligned 方法来代替。如果 Rust 选择的结构体的一个成员是不对齐的,这是合法的。

最终的代码:

use std::mem::MaybeUninit;
use std::ptr::addr_of_mut;

struct Role {
name: String,
disabled: bool,
flag: u32,
}

fn main() {
let role = unsafe {
let mut uninit = MaybeUninit::<Role>::uninit();
let role = uninit.as_mut_ptr();
addr_of_mut!((*role).name).write("basic".to_string());
(*role).flag = 1;
(*role).disabled = false;
uninit.assume_init()
};

println!("{} ({}, {})", role.name, role.flag, role.disabled);
}

什么时候用 addr_of_mut!

一般有两种情况:未初始化的内存,未对齐的引用。Rust 不允许用户创建一个未对齐的引用(即时只是暂时的),同时也不允许创建一个对未初始化内存的引用。那么,这些引用是什么时候被创建的呢?

对于下面的代码:(*flag).flag = 1,根据 Rust 的规则,如果一个类型没有实现 Drop,这是可以的。如果该类型实现了 Drop,这行代码会产生很多问题:当 Drop::drop 被调用时,并且调用在未初始化的内存上,这时我们就需要 addr_of_mut!。这就是为什么我们可以直接为 flag 字段赋值,但是我们却需要通过 addr_of_mut! 来获取 name 字段,因为它是一个字符串。

MaybeUninit

对安全的理解随着时间的推移而不断改变。曾经,mem::uninitialized 被认为是一个健全的 API,但是在后来,MaybeUninit 被引入去解决发现的缺点。但是,由于部分初始化的类型的存在,MaybeUninit 在实践中并不理想。虽然由于 #[repr(transparent)], MaybeUninit 和 T 是内存兼容的,但是在嵌套使用时的效果并不佳。

有时你需要结构体的某个字段上有一个 MaybeUninit,但是只后你又希望这个抽象不存在,这种情况并不罕见。实际上,在实践中使用 MaybeUninit 是一个充满挑战的体验,但是这篇文章并没有体现出来。

我的 unsafe 代码正确吗?

在 2022 年,我承认,我不再对编写 Rust 代码感到自信。unsafe 的规则可能可能都是如此复杂,但是从我多年来阅读过的 unsafe 代码来说,大多数 unsafe 代码都不太关心这些规则,并且无视了它们。addr_of_mut!直到 1.53 才被添加到语言中是有原因的。即使到了今天,文档中都说它 Rust 结构体 repr 的对齐方式没有任何保证。

在过去的几年里,似乎发生了这样的事情:Rust 开发者在实践中编写 unsafe 越来越困难,现在的规则是如此复杂,以至于对一个随意的程序员来说非常难以理解,围绕他的文档也很容易被曲解。我在这篇文章的上一个版本中认为 addr_of_mut! 的一些使用是必要的,但实际上并非如此。在有人指出这个错误之前,文章已经得到了大量关注。

这些规则使得 Rust 最好的功能之一越来越难以接近,同时也越来越难以理解。要求存在 MaybeUninit,而不仅仅是过去的 mem::uninitialized API 是显而易见的,但是却展示了语言规则是多么的复杂。

我不认为这是好的。事实上,我认为这根本不是一个好的趋势,好像越来越少的人了解 unsafe Rust。与 C 的互操作性是让 Rust 伟大的一个原因,但是我们现在正在创建巨大的屏障,这是不可取的。更重要的是:编译器在指出我的错误时没有什么帮助。

让 unsafe 变得更符合人体工程学是一个困难的问题,但是它值得被解决。因为有一点很明确:人们不会很快停止编写 unsafe 代码。

· 阅读需 13 分钟

原文链接:https://lucumr.pocoo.org/2022/1/6/rust-extension-map/

翻译:trdthg

选题:trdthg

本文由 Rustt 翻译,StudyRust 荣誉推出

拓展 Rust 中的 Map

在 Rust 中,如果你想为用户提供一个灵活的 API,一般可以引入泛型参数。以一个 web 框架为例,它可能需要一个程序类型,并且需要传递给很多函数。这个程序类型需要能够以配置的形式被参数化。

引入 Any 特征

一个解决方法是使用 Any 特征。它需要一个 'static 的生命周期,当你之后使用它时,还需要用 Box 进行装箱。比如我们可能对它进行向下转型,即转换为原始的类型。这意味着你可以在某个地方(比如我们的 App)中存储和获取任意类型。

我们期望的 API 大致如下:

let app = App::new();

// place in extension map
app.extensions().insert(Config { ... });
app.extensions().insert(Database { ... });

// retrieve from extension map
let config = app.extensions().get::<Config>();

我们的 app 需要容纳其他拓展的类型,以便之后使用。

现在,让我们试试最简单的实现方式:准备一个 Extensions 对象,让它实现插入和获取的方法。如果一个拓展还不存在,我们就自动插入一个默认的(需要实现 Default 特征)。

use std::collections::HashMap;
use std::any::{Any, TypeId};

#[derive(Default)]
pub struct Extensions {
map: HashMap<TypeId, Box<dyn Any>>,
}

impl Extensions {
pub fn insert<T: 'static>(&mut self, value: T) {
self.map.insert(TypeId::of::<T>(), Box::new(value));
}

pub fn get<T: 'static>(&self) -> &T {
self.map.get(&TypeId::of::<T>())
.and_then(|b| b.downcast_ref())
.unwrap()
}

pub fn get_mut<T: Default + 'static>(&mut self) -> &mut T {
self.ensure::<T>();
self.map.get_mut(&TypeId::of::<T>())
.and_then(|b| b.downcast_mut())
.unwrap()
}

fn ensure<T: Default + 'static>(&mut self) {
if self.map.get(&TypeId::of::<T>()).is_none() {
self.insert(T::default());
}
}
}

上面的代码非常直接,但是存在两个问题:首先,只有 get_mut 能够调用 ensure 去插入默认值,如果有人直接调用 get 就会导致 panic。第二个问题是,借用检查器会让之后的编写非常困难。上面的 map 对于解决经典的问题(例如 app)是很有用的,你只需要配置一次,自那之后 map 就像是被冻结了一样,因为有太多的引用在飞来分飞去,以至于没有人能够得到 &mut 的引用。

how does it work?

上面的代码是如何做到的呢,Rust 中的每一种类型都会有一个 type ID,你可以使用 TypeId::of::<T>() 获取。他是唯一的,你可以用它进行比较,或者是作为 map 的键来使用。每种类型只允许有一个值。接着我们把 T 作为 dyn Any 存储在 map 里,Any 特征允许我们使用 downcast_refdowncast_mut 方法拿到原始类型。由于我们使用了 ensure 方法确保这里的类型存在,因此可以安全的 unwrap。

内部可变性

让我们看一个 web 框架或者是模板引擎的常见案例。以 MiniJinja(模板引擎)为例,它里面有一个 State 对象,每次模板初始化时都会创建一次,State 没有实现 Send 和 Sync,MiniJinja 在评估时需要 State。如果你想让用户能够放入自定义的 State 呢?在这种情况下,我们可以通过在内部使用 RefCell 来调整上面的类型。

use std::collections::HashMap;
use std::any::{Any, TypeId};
use std::cell::{Ref, RefCell, RefMut};

#[derive(Default)]
pub struct Extensions {
map: RefCell<HashMap<TypeId, Box<dyn Any>>>,
}

impl Extensions {
pub fn insert<T: 'static>(&self, value: T) {
self.map.borrow_mut().insert(TypeId::of::<T>(), Box::new(value));
}

pub fn get<T: Default + 'static>(&self) -> Ref<'_, T> {
self.ensure::<T>();
Ref::map(self.map.borrow(), |m| {
m.get(&TypeId::of::<T>())
.and_then(|b| b.downcast_ref())
.unwrap()
})
}

pub fn get_mut<T: Default + 'static>(&self) -> RefMut<'_, T> {
self.ensure::<T>();
RefMut::map(self.map.borrow_mut(), |m| {
m.get_mut(&TypeId::of::<T>())
.and_then(|b| b.downcast_mut())
.unwrap()
})
}

fn ensure<T: Default + 'static>(&self) {
if self.map.borrow().get(&TypeId::of::<T>()).is_none() {
self.insert(T::default());
}
}
}

从用户的角度来看,几乎没有变化。主要的区别是你不需要一个可变引用就能调用 get_mut,这一壮举是由 RefCell 实现的,Refcell 能够将检查移动到运行时。当一个 RefMut 被给出时,如果已经存在任何的可变或不可变引用,就会发生 panic。对于这里的用户来说,这并不是一个很大的问题,因为我们可以很容易地确保只有一个可变的引用在使用。特别棒的是,Ref 和 RefMut 类型提供了一个静态的 map 方法,让你可以轻松派生出另一个 Ref 或 RefMut,并保持原来的引用,但对值进行转换。

同步支持

如果我们想要用 Send 和 Sync 来实现和上面相同的效果呢?我们需要一个锁。可惜的是标准库提供的 Mutex 和 RwLock 不能让你在拿到锁的同时 map,你可以使用 parking_lot 替代,它实现了必要的一些方法。

use parking_lot::{
MappedRwLockReadGuard,
MappedRwLockWriteGuard,
RwLock,
RwLockReadGuard,
RwLockWriteGuard,
};
use std::any::{Any, TypeId};
use std::collections::HashMap;

#[derive(Default)]
pub struct Extensions {
map: RwLock<HashMap<TypeId, Box<dyn Any>>>,
}

impl Extensions {
pub fn insert<T: Send + Sync + 'static>(&self, value: T) {
self.map.write().insert(TypeId::of::<T>(), Box::new(value));
}

pub fn get<T: Send + Sync + Default + 'static>(&self) -> MappedRwLockReadGuard<'_, T> {
self.ensure::<T>();
RwLockReadGuard::map(self.map.read(), |m| {
m.get(&TypeId::of::<T>())
.and_then(|b| b.downcast_ref())
.unwrap()
})
}

pub fn get_mut<T: Send + Sync + Default + 'static>(&self) -> MappedRwLockWriteGuard<'_, T> {
self.ensure::<T>();
RwLockWriteGuard::map(self.map.write(), |m| {
m.get_mut(&TypeId::of::<T>())
.and_then(|b| b.downcast_mut())
.unwrap()
})
}

fn ensure<T: Default + Send + Sync + 'static>(&self) {
if self.map.read().get(&TypeId::of::<T>()).is_none() {
self.insert(T::default());
}
}
}

注意:由于 Any 并没有实现 Debug,所以我们很难为我们的 map 实现 Debug 特征,一些简单的改变并不能解决目前的问题。下半部分我们将介绍 as-any 模式

我们面临的挑战是,在 Rust 里,你不能使用 Box<Any + Debug>,然而还是有一些方法解决这个问题。

为 map 实现 Debug

简化问题

我们的目标是对 Box<dyn Any> 做一个包装,并让 Wrapper 实现 Debug。

#[derive(Debug)]
struct AnyBox(Box<dyn Any + Debug>);

如果你尝试编译,编译器应该会很不高兴的抛出错误:

error[E0225]: only auto traits can be used as additional traits in a trait object
--> src/main.rs:9:29
|
9 | struct AnyBox(Box<dyn Any + Debug>);
| --- ^^^^^ additional non-auto trait
| |
| first non-auto trait
|
= help: consider creating a new trait with all of these as supertraits and
using that trait here instead: `trait NewTrait: Any + Debug {}`

超级特征

幸运的是,编译器再次为我们指明了解决之道,我们需要创建一个父特征,并利用特征约束。同时,我们为所有实现了 Any 和 Debug 的类型实现我们的超级特征。就像下面这样:

#[derive(Debug)]
struct AnyBox(Box<dyn DebugAny>);

trait DebugAny: Any + Debug {}

impl<T: Any + Debug + 'static> DebugAny for T {}

你可以想这样构建一个 Box,但是真正不能通过编译的是向下转型

fn main() {
let any_box = AnyBox(Box::new(42i32));
dbg!(any_box.0.downcast_ref::<i32>());
}

编译器会告诉我们,AnyBox 中的值并没有 downcast_ref 方法

error[E0599]: no method named `downcast_ref` found for struct
`Box<(dyn DebugAny + 'static)>` in the current scope
--> src/main.rs:15:20
|
15 | dbg!(any_box.0.downcast_ref::<i32>());
| ^^^^^^^^^^^^ method not found in `Box<(dyn DebugAny + 'static)>`

原因是 Box<dyn DebugAny> 并不是 Box<dyn Any>,因此我们不能那里得到 Any 特征拥有的方法。那么我们如何解决这个问题呢?最简单的方法是 "as any" 模式,我们在我们的 DebugAny 特征上实现一个方法,将其向上转换为一个 Any。看起来像这样:

trait DebugAny: Any + Debug {
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}

impl<T: Any + Debug + 'static> DebugAny for T {
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}

现在虽然我们依然不能在 DebugAny 上调用 downcast_ref,但是我们可以拿走它的值,并调用 as_any 得到一个 &dyn Any

fn main() {
let any_box = AnyBox(Box::new(42i32));
dbg!(any_box.0.as_any().downcast_ref::<i32>());
dbg!(&any_box);
}

但是当我们运行后,却得到了一个 None。发生什么事了???

[src/main.rs:23] any_box.0.as_any().downcast_ref::<i32>() = None

这个谜题的答案与方法解析的工作方式和空白实现有关。当我们在 Box<dyn DebugAny> 上调用 as_any 时,Box 并没有发生自动解引用,事实上调用的是 Box<dyn DebugAny> 的 as_any,因为 Box 现在也实现了我们的 DebugAny。那么,我们如何穿过这个 Box 呢?通过手动解引用。

fn main() {
let any_box = AnyBox(Box::new(42i32));
dbg!((*any_box.0).as_any().downcast_ref::<i32>());
dbg!(&any_box);
}

这样就是我们预期的值了

[src/main.rs:23] (*any_box.0).as_any().downcast_ref::<i32>() = Some(
42,
)
[src/main.rs:24] &any_box = AnyBox(
42,
)

可调试的 Extension Map

有了上面的经验,我们现在可以拿出之前的非同步 map,稍加改造就能为其实现 Debug。

use std::any::{Any, TypeId};
use std::cell::{Ref, RefCell, RefMut};
use std::collections::HashMap;
use std::fmt::Debug;

trait DebugAny: Any + Debug {
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}

impl<T: Any + Debug + 'static> DebugAny for T {
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}

#[derive(Default, Debug)]
pub struct Extensions {
map: RefCell<HashMap<TypeId, Box<dyn DebugAny>>>,
}

impl Extensions {
pub fn insert<T: Debug + 'static>(&self, value: T) {
self.map
.borrow_mut()
.insert(TypeId::of::<T>(), Box::new(value));
}

pub fn get<T: Default + Debug + 'static>(&self) -> Ref<'_, T> {
self.ensure::<T>();
Ref::map(self.map.borrow(), |m| {
m.get(&TypeId::of::<T>())
.and_then(|b| (**b).as_any().downcast_ref())
.unwrap()
})
}

pub fn get_mut<T: Default + Debug + 'static>(&self) -> RefMut<'_, T> {
self.ensure::<T>();
RefMut::map(self.map.borrow_mut(), |m| {
m.get_mut(&TypeId::of::<T>())
.and_then(|b| (**b).as_any_mut().downcast_mut())
.unwrap()
})
}

fn ensure<T: Default + Debug + 'static>(&self) {
if self.map.borrow().get(&TypeId::of::<T>()).is_none() {
self.insert(T::default());
}
}
}

向 map 里面添加点东西,打印一下:

[src/main.rs:63] &extensions = Extensions {
map: RefCell {
value: {
TypeId {
t: 13431306602944299956,
}: 42,
},
},
}

在这个例子中,我在 map 中放置了一个 32 位的整数 42,它打印出了作为键的 TypeId,和作为值的 42。

保留类型名称

如果你想保留原来的类型名称,而不仅仅是类型的 ID,我们可以使用一个自定义的类型作为 map 的键。通过对 TypeId 和 TypeName 做一次简单的包装就能轻松实现:

use std::any::{TypeId, type_name};
use std::hash::{Hash, Hasher};
use std::fmt::{self, Debug};

pub struct TypeKey(TypeId, &'static str);

impl TypeKey {
pub fn of<T: 'static>() -> TypeKey {
TypeKey(TypeId::of::<T>(), type_name::<T>())
}
}

impl Hash for TypeKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}

impl PartialEq for TypeKey {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}

impl Eq for TypeKey {}

impl Debug for TypeKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.1)
}
}

接着用它替换掉原来的键,调试一下:

[src/main.rs:90] &extensions = Extensions {
map: RefCell {
value: {
i32: 42,
alloc::vec::Vec<i32>: [
1,
2,
3,
],
},
},
}

注意,我在 map 中额外插入了一个 Vec<i32>,以获得更明显的输出。

· 阅读需 4 分钟

nixos

安装注意

在执行最后的 nixos-install 之前记得打开网络设置,不然就要重装

网络设置

systemctl start wpa_supplicant

wpa_cli
add_network
set_network 0 ssid | psk | key_mgmt

WPA-PSK

list_network
enable_network 0

关闭设备(wifi)被禁用

rfkill list
rfkill unblock all

亮度设置

programs.light.enable = true;
light -U 30 # darker.
light -A 30 # brighter.

音量调节


alsamixer

amixer set Master mute
amixer set Master unmute
amixer set Master 10%
amixer set Master 20%

i3wm

多屏幕

xrandr
xrandr --output DP-1 --auto --right-of eDP-1

manjaro

cpu 调频

  1. 查看当前所有 CPU 的信息:
cpupower -c all frequency-info
  1. 设置所有 CPU 为性能模式:
cpupower -c all frequency-set -g performance
  • performance: 固定最高运行频率上,不动态调节。

  • powersave: 固定工作在其支持的最低运行频率上

  • ondemand: 按需快速动态调整 CPU 频率,一有 cpu 计算量的任务,就会立即达到最大频率运行,等执行完毕就立即回到最低频率;

  • conservative: 与 ondemand 不同,平滑地调整 CPU 频率,频率的升降是渐变式的,会自动在频率上下限调整,和 ondemand 的区别在于它会按需分配频率,而不是一味追求最高频率;

自动挂载

# 100mb 虚拟硬盘
mount tmpfs in /home/trdthg/tmp/
tmpfs /home/trdthg/tmp tmpfs size=96m 0 0

# 1.查看电脑中所有硬盘的分区情况。
# 命令如下:
# sudo fdisk -l
# 2.结果如下
# /dev/nvme0n1p3 567296 210282495 209715200 100G Microsoft 基本数据
# /dev/nvme0n1p4 210282496 872337407 662054912 315.7G Microsoft 基本数据

#auto mount windows fs
/dev/nvme0n1p3 /mnt/C ntfs nls=utf8,umask=000 0 0
/dev/nvme0n1p4 /mnt/D ntfs nls=utf8,umask=000 0 0

# 注: 末尾的 2 行是添加的内容。其中/dev/nvme0n1p3 一行代表 C 盘分区将自动挂载到/mnt/C 目录下,文件系统为 NTFS(如果步骤 1 中查看分 区的文件系统为 FAT32 时,此处请写 vfat),字符编码为 utf8。umask 表示文件目录的权限,此参数以及之后的 2 个参数都为 0 即可。以下几行以 此类推。此处可以选择性的添加需要自动挂载的分区,不想挂载的分区不用书写。

openssh-server(sshd) 启动

OpenSSH

OpenSSH 可以支撑 Manjaro 成为 SSH Server,以便其他主机可以通过 SSH 连接到 Manjaro。

# 安装 OpenSSH
sudo pacman -S openssh
# 开机自启 sshd 服务
sudo systemctl enable sshd
# 启动 sshd 服务
sudo systemctl start sshd
# 重启 sshd 服务
sudo systemctl restart sshd

gamepad

驱动下载:xboxdrv

蓝牙:

bluetoothctl pair <mac_addr>
bluetoothctl connect <mac_addr>
bluetoothctl remove <mac_addr>
bluetoothctl trust <mac_addr>

如何使用 bluetoothctl 在 Linux 上管理蓝牙设备

测试:

树莓派和手柄 - 蓝牙连接

资料:

How to Set Up and Use Game Controllers on Linux Arch-Wiki Gamepad Guide – Configuring XInput support for Linux

· 阅读需 9 分钟

Bitcast

  • 日志型
  • 基于 hash 表
  1. 只支持追加 Bitcast 仅支持追加操作(Append-only),即所有的写操作只追加而不修改老的数据。
  2. 多版本文件
    • 每个文件有一定的大小限制,当文件增加到相应的大小时,就会产生一个新的文件,老的文件只读不写。
    • 在任意时刻,只有一个文件是可写的,用于数据追加,称为活跃数据文件(active data file)。而其他已经达到大小限制的文件,称为老数据文件(older data file)。
  3. 日志文件
    • Bitcast 数据文件中的数据是一条一条的 "操作", 所有的操作都会序列化到日志文件里 (包括删除).
    • 写入时首先将 Key-Value 记录追加到活跃数据文件的末尾,接着更新内存哈希表,因此,每个写操作总共需要进行一次顺序的磁盘写入和一次内存操作。
  4. 日志压缩
    • Bitcask 需要定期执行合并(Compaction)操作以实现垃圾回收。
    • 合并操作,即将所有老数据文件中的数据扫描一遍并生成新的数据文件,同一个 key 的多个操作以只保留最新一个的原则进行删除,每次合并后,新生成的数据文件就不再有冗余数据了。
  5. hash 索引
    • 哈希索引存储在内存中,如果不做额外的工作,服务器断电重启重建哈希表需要扫描一遍数据文件
    • 如果数据文件很大,这是一个非常耗时的过程。可以通过索引文件(hint file)来提高重建哈希表的速度。

LSM

核心思想

放弃部分读能力,换取写入的最大化能力

先可以将更新的数据驻留在内存中,等到积累足够多之后,再使用归并排序的方式将内存内的数据合并追加到磁盘中。

LSM-tree 的主要思想是划分不同等级的树。

  • 以两级树为例,可以想象一份索引数据由两棵树组成,一棵树存在于内存,一棵树存在于磁盘。
  • 内存中的树可以不一定是 B 树,可以是其他的树,例如 AVL 树。因为数据大小是不同的,没必要牺牲 CPU 来达到最小的树高度,而存在于磁盘的树则是一棵 B 树。

写入过程

  • 在 LSM 树中,写入数据时首先会插入到内存的树中。

  • 当内存中的树的数据超过一定阈值时,会进行合并操作。合并操作会顺序遍历内存中的树的叶子节点,并与磁盘中的树的叶子节点进行合并

    • 当被合并的数据量达到磁盘的存储页的大小时,会将数据持久化到磁盘,同时更新父亲节点对叶子节点的指针。
  • LSM 树可以划分成很多层级的树

    • 第 0 层存储在内存,第 1 到 k 层存储在磁盘,每层的数据都是有序的。
    • 数据首先会插入到第 0 层,然后后台逐层合并。
    • 每一层的数据超过一定阈值时,往下一层合并。
    • 读取叶由于不知道数据在哪一层上,可能需要遍历所有的层。

Level DB

LevelDB 存储引擎主要包括:

  • 内存中的 MemTable 和不可变 MemTable(也称为 Frozen MemTable)
  • 磁盘上的几种主要文件:
    • 当前(Current)文件
    • 清单(Manifest)文件
    • 操作日志(Commit Log,也称为提交日志)文件
    • SSTable 文件

写入过程:

  • 当应用写入一条记录时,LevelDB 会首先将修改操作写入到操作日志文件,成功后再将修改操作应用到 MemTable,这样就完成了写入操作。
  • 当 MemTable 占用的内存达到一个上限值后,需要将内存的数据转储到外存文件中。
  • LevelDB 会将原先的 MemTable 冻结成为不可变 MemTable,并生成一个新 MemTable。新到来的数据被记入新的操作日志文件和新生成的 MemTable 中。
  • 不可变 MemTable 的内容是不可更改的,只能读取不能写入或者删除。LevelDB 后台线程会将不可变 MemTable 的数据排序后转储到磁盘,形成一个新的 SSTable 文件,这个操作称为 Compaction。
  • SSTable 文件是内存中的数据不断进行 Compaction 操作后形成的,且 SSTable 的所有文件是一种层级结构,第 0 层为 Level 0,第 1 层为 Level 1,以此类推。

SSTable: SSTable是一个键是有序的,存储字符串形式键值对的文件。它是一个内部包含了任意长度、排好序的键值对集合的文件。SSTable文件由两部分数据组成:索引和键值对数据。所有的key和value都是紧凑地存放在一起的,如果要读取某个键对应的值,需要通过索引中的key:offset来定位。SSTable在序列化成文件之后,是不可变的,因为此时的SSTable,就类似于一个数组一样,如果插入或者删除,需要移动一大片数据,开销比较大。

加快访问效率:

  • LSM 树写入效率很高,但读取可能需要访问较多的磁盘文件,效率较低。为了加快读取效率,工程实现上一般使用 Bloom Filter 来加快读取效率。它使用很小的存储空间换来较大的读取效率提升。

Bloom Filter 是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合。

  • Bloom Filter 的这种高效是有一定代价的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter 不适合那些“零错误”的应用场合。
  • 而在能容忍低错误率的应用场合下,Bloom Filter 通过极少的错误换取了存储空间的极大节省。

初始状态时,Bloom Filter 是一个包含 m 位的位数组,每一位都置为 0。为了表达 S={x1, x2,…,xn}这样一个 n 个元素的集合,Bloom Filter 使用 k 个相互独立的哈希函数(Hash Function),它们分别将集合中的每个元素映射到{1,…,m}的范围中。对任意一个元素 x,第 i 个哈希函数映射的位置 hi(x) 就会被置为 1(1≤i≤k)。注意,如果一个位置多次被置为 1,那么只有第一次会起作用,后面几次将没有任何效果。在判断 y 是否属于这个集合时,我们对 y 应用 k 次哈希函数,如果所有 hi(y) 的位置都是 1(1≤i≤k),那么我们就认为 y 是集合中的元素,否则就认为 y 不是集合中的元素。

· 阅读需 1 分钟

v3 Docusaurus

2022.10.21 从 vuepress2 迁移到 Docusaurus

v2 vuepress3

v1 vuepress1

基于 typora 处理图片

# rm -rf docs/.vuepress/dist
cd vuePressBlog
# 生成静态文件
pnpm run build

# 图片源修改
rm docs/.vuepress/public/assets/img/*
cp /home/trthg/.config/Typora/typora-user-images/* docs/.vuepress/public/assets/img/

# md 引用图片路径修改
sed -i "s/\/home\/trthg\/.config\/Typora\/typora-user-images/\/assets\/img/g" `grep -rl "/assets/img" ./`

# # /* 会忽略。开头的文件 /. 不会
rm -r ../assets
rm -r ../java
rm -r ../other
rm -r ../js
rm -r ../python
rm -r ../rust
rm -r ../ioclub
rm -r ../magic
rm ../*.html
# rm ../*.png
# rm ../*.jpg

mv docs/.vuepress/dist/* ../

curDate=$(date "+%Y-%m-%d")
curTime=$(date "+%H:%M:%S")
# # git init
cd ..
git add .
git commit -s -m "commit: $curDate $curTime"
git push -u origin main

· 阅读需 13 分钟

优势

  • 通过并行计算增加容量 (parallelism)

  • 通过复制容忍故障 (fault tolerance)

  • 匹配物理设备的分布 (physical)

  • 通过隔离来实现安全 (security / isolated)

    • 将安全的和不安全的计算放在不同机器上运行
    • 系统间通过网络进行交互

挑战

  • 分布式系统有许多部分组成,这些部分是同时运行的,会遇到各种并发编程和复杂交互带来的问题。因此需要某些机制在时间上进行控制(比如超时机制,熔断机制)。

  • 多台计算机加网络会使故障原因也更加复杂

  • 性能,n 台计算机并不能达到 n 倍的性能

主题

  1. 一致性

    通用的基础设施需要有明确的行为。例如,"Get(k) 获取最近一次 Put(k,v) 的值"。

    实现良好的行为是很难的! "复制" 的服务器很难保持一致。

  2. 性能

    目标:可扩展的吞吐量

    Nx 个服务器,通过并行的 CPU、磁盘、网络实现 Nx 个总吞吐量。随着 N 的增长,扩展会变得更加困难,负载不平衡。有些事情不会随着 N 的增加而加快,例如初始化、交互。

  3. 权衡

    容错性、一致性和性能是敌人。实现容错性和一致性需要通信

    • 发送数据到备份

    • 检查数据是否是最新的。

    • 通信通常很慢,而且不可扩展

    • 许多设计只提供弱的一致性,以获得速度。

      • 例如,Get() 并不*产生最新的 Put()! 对于应用程序的程序员来说,这是很痛苦的,但可能是一个很好的权衡。

    我们会在一致性/性能中看到许多设计点。

  4. 实现

    RPC、线程、并发控制。

Lab

  • Map-Reduce

  • Raft 解决容错性

  • 使用 Raft 构建 K/V server,它可以被复制

  • Sharded K/V server 将有可复制能力的主备 K/V server 克隆到多个组中,并将之前的数据分割存储到这些组中,提高运行速度(每个组只存储自己对应的数据,组合起来就是一整份数据)。同时还要实现在不同的服务期间移动数据,保证不会丢失(数据分片到各个组中,各组的服务器内也会有主从复制)。

Map-Reduce

以一个 word-count 为例,如果集群要对上万的文件进行计算,GFS 会先寻找到文件的所在位置,然后直接在本机的 map-reduce 程序中运行,从而节约了大量的网络传输。

将按行存储转换为按列存储的过程,在论文中成为 shuffle

概述

背景:在多 TB 级数据集上进行多小时的计算,例如,建立搜索索引,或排序,或分析网络的结构,只有在有 1000 台计算机的情况下才实用。

但是应用不是由分布式系统专家编写的,它的总体目标是让非专业的程序员也能轻松使用,对于程序员来说,他只需要定义 Map 和 Reduce 函数 (通常是相当简单的同步代码). MR 管理并隐藏了分布式的所有细节!

一个 MapReduce 作业的抽象视图

输入 1 -> Map -> a,1 b,1
输入 2 -> Map -> b,1
输入 3 -> Map -> a,1 c,1
| | |
| | -> Reduce -> c,1
| -----> Reduce -> b,2
---------> Reduce -> a,2
  • 1) 输入文件(已经)被分成 M 个文件
  • 2) MR 对每个输入文件调用 Map(),产生一组 k2, v2 的 "中间" 数据,每个 Map() 调用都是一个 "任务"
  • 3) 当地图被 Reduce 时。MR 会收集给定 k2 的所有中间 v2。并将每个键和值传递给一个 Reduce 调用
  • 4) 最终输出是来自 Reduce() 的<k2,v3>对的集合。

以 wordcount 为例:

  • Map(k, v) 将 v 分割成单词
  • 对于每个词 w, emit(w, "1")
  • Reduce(k, v_set)
  • emit(len(v_set))

MapReduce 系统的优缺点

  1. MapReduce 的扩展性很好。

    N 个 worker 计算机(可能)让你获得 Nx 的吞吐量。Maps() 和 Reduce() 可以并行运行,因为它们不相互影响。因此,更多的计算机可以带来更多的吞吐量!

  2. MapReduce 隐藏了很多细节。

    • 发送应用代码到服务器
    • 跟踪哪些任务已经完成
    • 将中间数据从 Maps "洗" 到 Reduce 中去
    • 平衡服务器上的负载
    • 从故障中恢复。
  3. MapReduce 限制了应用程序可以做的事。

    • 没有互动或状态 (除了通过中间输出)
    • 没有迭代
    • 没有实时或流式处理
  4. MapReduce 输入和输出都存储在 GFS 集群文件系统上

    • MR 需要巨大的并行输入和输出的吞吐量。
    • GFS 将文件分割到许多服务器上,以 64MB 为一个块。
      • Map 并行读取
      • Reduce 并行写入
    • GFS 还将每个文件复制到 2 或 3 个服务器上

MR 的工作细节。

  1. MapReduce 需要一个协调器,将任务分配给 worker 并纪录进度。

    协调器将 Map 任务分配给 worker,直到所有的 Map 完成。

    • Map 将输出(或者说中间数据)写到本地磁盘上
    • Map 通过哈希将输出分割到每个 Reduce 任务的一个文件中。

    在所有 Map 完成后,协调器将 Reduce 任务分配给 worker

    • 每个 Reduce 任务从(所有)Map worker 那里获取其中间输出。
    • 每个 Reduce 任务在 GFS 上写入一个单独的输出文件
  2. 什么可能会限制性能?

    CPU、内存、磁盘、网络?在 2004 年,论文作者受到了网络容量的限制。

    MR 在网络上发送什么?

    • Map 从 GFS 读取输入。
    • Reduces 读取 Map 的中间输出。通常和输入一样大,例如用于排序。
    • Reduces 写输出文件到 GFS。

    在 MR 的 shuffle 过程中,一半的流量要经过根交换机。

    论文的根交换机速度为 100 ~ 200 Gb/s,总共有 1800 台机器,所以每台机器可以分得 55 Gb/s。相比于磁盘或 RAM 的速度小得多。

  3. MR 如何尽量减少网络的使用?

    • 协调器试图在存储其输入的 GFS 服务器上原地运行每个 Map 任务。所有的计算机都会同时运行 GFS 和 MR worker, 所以 Map 的输入都会通过 GFS 在本地磁盘读取,而不是网络。

    • 中间数据被分割为许多文件,每个文件都存储了许多 key. 文件数量比 key 要少得多,大文件传输的效率要更高

  4. MR 如何处理负载均衡?

    如果 N-1 个服务器必须等待 1 个慢速服务器完成,则是浪费和缓慢的。但有些任务可能确实比其他任务花的时间更长。

    解决方法:比 worker 数量多得多的任务

    • 协调器将新的任务分配给完成先前任务的 worker。
    • 因此,没有一个任务大到可以支配完成时间(希望如此)。
    • 因此,快的服务器会比慢的服务器做更多的任务,完成的时间也差不多。
  5. MR 的容错性如何?

    如果一个 worker 在 MP 任务中崩溃了怎么办? MR 会对程序员隐藏故障。

    MR 不必从头开始重新运行整个工作,它只重新运行失败的 Map 和 Reduce。假设 MR 将一个 Map 运行了两次,一个 Reduce 看到了第一次运行的输出。另一个 Reduce 看到了第二次运行的输出?

    正确性要求重新执行时产生完全相同的输出。所以 Map 和 Reduce 必须是纯确定性的函数。它们只允许看它们的参数/输入。没有状态,没有文件 I/O,没有交互,没有外部通信。

    如果你想允许 non-functional 的 Map 或 Reduce 呢?worker 失败将重新执行整个工作。或者是回滚到某个全局检查点。

  6. 崩溃恢复的细节

    • 一个 Map worker 崩溃了。

      • 协调器注意到 worker 不再响应 ping
      • 协调器知道哪些 Map 任务在该 worker 上运行
        • 这些任务的中间输出现在已经丢失,必须重新运行
        • 协调器通知其他 worker 运行这些任务
      • 如果所有的 Reduce 任务都获取了中间数据,可以不重新运行。
    • 一个 Reduce worker 崩溃了。

      • 完成的任务是好的 -- 已经存储在 GFS 中,并且保存有副本。
      • 协调器在其他 worker 上重新启动未完成的任务。
  7. 其他故障/问题。

    • 如果协调者给两个 worker 分配了相同的 Map 任务怎么办?

      = 也许协调器错误地认为一个 worker 死了。它将只告诉 Reduce worer 其中一个。

    • 如果协调者给了两个 worker 同样的 Reduce() 任务怎么办?

      • 他们都会试图在 GFS 上写下同一个输出文件!
      • GFS 的原子重命名可以防止混合;一个完整的文件将是可见的。
    • 如果一个 worker 非常慢 --"散兵游勇",怎么办?

      • 也许是硬件它弱。
      • 协调器启动最后几个任务的第二个副本。
    • 如果一个 worker 由于硬件或软件损坏而计算出不正确的输出,怎么办?

      • 太糟糕了!MR 假设 "故障停止" 的 CPU 和软件。
    • 如果协调器崩溃了怎么办?

  8. 目前的状况?

    • 影响力巨大(Hadoop, Spark, &c)。
    • 可能谷歌已经不使用了。
      • 被Flume/FlumeJava取代(见Chambers等人的论文)。
      • GFS 被 Colossus(没有好的描述)和 BigTable 取代。
  9. 结论

    MapReduce 使大集群计算流行起来。

    • 不是最有效或最灵活的。
    • 扩展性好。
    • 易于编程 -- 失败和数据移动被隐藏。

    这些在实践中是很好的权衡。现在已经有了一些更高级的继承者。

· 阅读需 1 分钟

大家好!今天,我们要写一个新的包管理器,甚至比 Yarn 还要好!好吧,也许不是,但至少我们会玩得开心,了解包管理器的工作原理,并思考 Yarn 的下一步可能会发生什么。