原文链接:https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/

翻译:trdthg

选题:Akagi201

本文由 Rustt 翻译,StudyRust 荣誉推出

使用 Tokio 处理 CPU 密集型任务

尽管 async 通常都被应用于异步网络 I/O,但是在这篇文章里,我会像你介绍为什么使用 Tokio 处理 CPU 密集型任务(比如数据分析引擎等)也是一个不错的选择。

Tokio 是什么?

Rust 本身提供了一个类似于 JavaScript 的异步编程模型。

为了充分利用多核和异步 I/O。一个运行时是必须的,尽管社区有很多异步运行时的选择,但是 Tokio 是事实上的标准。尽管 Tokio 在官网上描述到它是 Rust 语言的一个异步运行时,并且提供了编写网络服务所需要的模块,它也可以被用在其它场景。

为什么使用 Tokio 处理 CPU 密集型任务

现代化的数据分析引擎总是不可避免的要处理来自客户端的网络请求,以及通过网络和对象存储系统(比如 ASW S3、GCP Cloud、Azure 等)进行通信。因此,任何使用 Rust 实现的系统,大多都会用 Tokio 去处理这部分网络相关的服务,或者是一部分文件 I/O 服务。

除了应对网络外,数据分析引擎还需要做大量繁重的的 CPU 计算,消耗大量 CPU 资源去进行诸如:重新组织数据存储、提前计算各种索引、或者是直接回复客户端请求等工作。这些复杂计算通常会被切成许多单独的块(我把它们称为 "任务"),然后被并行的处理,以利用到现代 CPU 的多核特性。

任务调度器会决定哪个任务应该在什么时候运行,它会将任务映射到合适的 CPU 内核或者是线程上。

学术界和工业界对于各种任务调度器、工作池、线程池等已经积累了很多年的研究。

我自己已经实现并且使用过几个自定义的任务调度器。他们在大多数时间 (99.9%) 都工作的很好,但是在处理边缘情况(比如快速停机、任务取消、清理等)时,他们的效果非常不尽人意。由于这些任务调度器使用了较低级别的线程原语,出现线程间竞争的情况比比皆是,所以我不建议这样做。

因此,当我在 Rust 生态中寻找一个任务调度器时,你会很自然的选择 Tokio。Tokio 有很多优势:

  1. 你只需要 Tokio,并不需要添加其他依赖项。
  2. Tokio 实现了一个复杂的 支持任务窃取的调度器
  3. Tokio 内部实现了对 async/await 的支持。并且有许多相对成熟的库去处理流、异步锁、管道、异步取消等。
  4. Tokio 在 Rust 生态系统中经过了良好测试,并且有着大量使用案例。
  5. Tokio 通常会将正在运行的任务和 Future 放在同一个执行器内,这有利于实现局部缓存。
  6. Tokio 的 文档 很完善,并且在积极更新维护。

因此,选择 Tokio 作为 CPU 密集型任务的任务调度程序是理所应当的,对吧?WROOOOOOOONG!

使用 Tokio 的反对意见

选择 Tokio 在我们团队中变成了一个热门话题,到现在依然不是所有人都认可这个决定。在我们做 DataFusion 和 InfluxDB IOx 的早期,我们很担心这个问题。以下是一些反对意见:

Tokio 文档的警告:

老版本的 Tokio 文档(比如 1.10 版)里面有一条著名的警告:

If your code is CPU-bound and you wish to limit the number of threads used to run it, you should run it on another thread pool such as Rayon.

如果你的代码要处理 CPU 密集型任务,并且想要尽量减少使用到的线程数,你应该将这些任务分配到另一个线程池比如 Rayon。

这个警告对我们团队和社区都造成了很大的困惑。很多人读了之后都以为 Tokio 永远不应该用来处理 CPU 密集型任务。但是文档的关键其实是说,一个运行时实例(同一个线程池)不应该同时用于 I/O 和 CPU 计算,我们之后澄清了文档 的意图。

顺便说一句,Tokio 文档建议用 Rayon 处理 CPU 密集型任务。Rayon 对于很多程序都是很好的解决方案,但是它并不支持异步。如果你的代码中哪怕只有一点需要使用异步,那你就不得不跨过同步和异步的痛苦边界。我还发现实现一个 基于拉取的执行器模型 会更困难,这种模型要求某个任务必须等待所有的输入都准备好在能在 Rayon 中运行

尾部延迟会拖累你

聪明的人会说:使用 Tokio 处理 CPU 密集型任务会增加请求的尾部延迟,这是难以令人接受的。

尾部延迟?🙄

你可能认为:我正在编写一个数据库,尾部延迟听起来像是对于高负载的 Web 服务器的一个学术问题……”

但其实,这也是需要考虑的:思考一下健康检查,健康检查对于使用容器编排系统(比如 Kubernetes)部署的服务是必不可少的。检查的方式通常是发送一个 HTTP 请求到某个 API,例如 /health。如果该请求已经被分派到某一个任务队列中,但是 Tokio 正在忙于使用 CPU 进行大量数据处理任务,那么 Kubernetes 将不能及时得到“系统正常”的响应,你的进程就会被 K8s 杀死。因此得到结论:由于尾部延迟,你不能将 Tokio 用于 CPU 密集型任务。

但是,就像 Tokio 在文档中阐述的,想要防止你的程序在 CPU 完全饱和的情况下被 K8s 误杀,你应该使用两个独立的线程池。一个用来执行对尾部延迟敏感的任务,就比如响应 /health 接口。另一个用来执行 CPU 密集型任务。这些线程池的的最佳线程数需要根据具体需求去调整。

如果你将 Tokio 运行时只是视为一个复杂点的线程池,那么使用多个运行时实例的想法可能更容易接受,我们将在最后使用专用的执行器演示如何实现这个想法。

单任务开销很高

Tokio 的每个任务开销很高。

对于这点,我一点也不惊讶。人们总是可以实现比 Tokio 运行速度更快的线程池。但是,这些线程池并不是足够稳定,难以应对生产环境的负载,并且他们也不具备像 Tokio 一样的庞大生态系统。

在许多场景下,单任务的开销可以使用“矢量化处理” 来分摊。意思是每个任务回同时处理几千行数据而不是单单一行,你需要将任务分成合理大小的块。你也不能分摊所有工作场景下的开销。但是,对于我们的程序关心的实例来说,Tokio 的任务开销已经微乎其微了

实践

假设你已经被说服了使用 Tokio 去处理 CPU 密集型任务是可行的。现在你应该怎么做?

首先,至关重要的一点是,你的代码应该符合以下原则:异步代码永远不应该花费很长时间才能完成,这一点请参考 Alice Ryhl 的 Async: What is blocking?。这是为了让调度器有机会安排其他事情,比如任务窃取等。

当然了,这个“很长时间”取决于你的程序;Ryhl 建议在优化响应的尾部延迟时,单个异步任务完成时间应该在 10 ~ 100 微秒。我认为在针对 CPU 进行优化时 10~100 毫秒也能有不错的效果。但是在我的测试 estimated per-task Tokio overhead 中,Tokio 单任务的开销在约 10 纳秒范围内,因此几乎不可能用 10 毫秒的任务来测量 Tokio 运行时开销。

其次,将任务分派到一个单独的执行器

专用的执行器

这里是一个简单的例子,演示了我们如何在 InfluxDB IOx 上将任务分配到一个单独的 Tokio 运行时上(完整代码可以在我们的仓库里查看,里面还有关于清理、停机、合并的内容)

pub struct DedicatedExecutor {
    state: Arc<Mutex<State>>,
}

/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
/// them) on a separate Tokio Executor
struct State {
    /// Channel for requests -- the dedicated executor takes requests
    /// from here and runs them.
    requests: Option<std::sync::mpsc::Sender<Task>>,

    /// Thread which has a different Tokio runtime
    /// installed and spawns tasks there
    thread: Option<std::thread::JoinHandle<()>>,
}

impl DedicatedExecutor {
    /// Creates a new `DedicatedExecutor` with a dedicated Tokio
    /// executor that is separate from the threadpool created via
    /// `[tokio::main]`.
    pub fn new(thread_name: &str, num_threads: usize) -> Self {
        let thread_name = thread_name.to_string();

        let (tx, rx) = std::sync::mpsc::channel::<Task>();

        let thread = std::thread::spawn(move || {
            // Create a new Runtime to run tasks
            let runtime = Tokio::runtime::Builder::new_multi_thread()
                .enable_all()
                .thread_name(&thread_name)
                .worker_threads(num_threads)
                // Lower OS priority of worker threads to prioritize main runtime
                .on_thread_start(move || set_current_thread_priority_low())
                .build()
                .expect("Creating Tokio runtime");

         // Pull task requests off the channel and send them to the executor
         runtime.block_on(async move {
                while let Ok(task) = rx.recv() {
                    Tokio::task::spawn(async move {
                        task.run().await;
                    });
                }

        let state = State {
            requests: Some(tx),
            thread: Some(thread),
        };

        Self {
            state: Arc::new(Mutex::new(state)),
        }
    }

这段代码会在一个新线程 std::thread,并在这个线程里创建了一个新的 Tokio 运行时。运行时会从 channel 获取任务并运行。

注意:这个新的线程很关键,如果你尝试在主线程里或者是任何已经创建过 Tokio 运行时的线程里再次创建新的运行时,程序就会报错,因为已经有一个运行时了。

下面的代码将任务分派到第二个运行时。

impl DedicatedExecutor {

    /// Runs the specified Future (and any tasks it spawns) on the
    /// `DedicatedExecutor`.
    pub fn spawn<T>(&self, task: T) -> Job<T::Output>
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
    {
        let (tx, rx) = tokio::sync::oneshot::channel();

        let fut = Box::pin(async move {
            let task_output = task.await;
            tx.send(task_output).ok()
        });
        let mut state = self.state.lock();
        let task = Task {
            fut,
        };

        if let Some(requests) = &mut state.requests {
            // would fail if someone has started shutdown
            requests.send(task).ok();
        } else {
            warn!("tried to schedule task on an executor that was shutdown");
        }

        Job { rx, cancel }
    }
}

上面的代码使用了一个名为 Job 的结构体,它是一个对 Future 的简单包装,Job 能够将 Future 的执行结果从单独的执行器内传输回主线程。相关代码如下。

#[pin_project(PinnedDrop)]
pub struct Job<T> {
    #[pin]
    rx: Receiver<T>,
}

impl<T> Future for Job<T> {
    type Output = Result<T, Error>;

    fn poll(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let this = self.project();
        this.rx.poll(cx)
    }
}

就是这样!你可以在 Github gist 中找到所有代码。