Rust – Future 与异步

前言

Rust 1.39 发布已经快一个月了, 而同时 futures 0.3 也已经正式发布, 在之前查看相关资料的时候, 都是在说 Future 怎么使用, 或者是如何实现 Future trait, 很少有看到讲 Future 是怎么运作起来的. 于是今天想来总结一下 rust 异步的一些东西.

Future 0.1 vs 0.3

对于刚刚接触Future的人来说,首先面对的一个问题就是,0.1和0.3版本有啥区别?关于这个问题我之前在知乎某个文章底下评论过,在这里再重复说一遍。

首先大家可以看一下 rfc 里面关于历史的介绍, https://rust-lang.github.io/rfcs/2592-futures.html#historical-context 在这里。

简单的来说就是,早在2016年的时候,社区出了一款库,名字叫 futures,版本就是0.1,在这里提出了最早的Future这个概念,以及对应的 trait。在之后2018年的时候, 大家都知 async/await 关键字开始发展, 于是这个时候 futures 团队起草了一系列的 RFC, 对 Future 进行了很多修改, 最终修改成了 0.3 的样子, 并且将这个 trait 合并到了标准库里面.

因此我们从这个提交 https://github.com/rust-lang/futures-rs/commit/33d6ba46a63ebc137edf47818997be5ce41d5616#diff-541deb29dfb9b7c9148c585d00f14398L3 里面可以看到, 从这个时间上, futures-rs 里面对于 Future trait的依赖已经变成了标准库, 而不是自己定义的. 而这个时候其实就是 0.3 版本了, 当然 0.3 版本拖了相当长的时间才发布.

这意味着如果你想用 Future, 其实是不需要引入 futures-rs 这个第三方库的. 不过实际上它提供了很多实用工具, 不引入写起来其实是挺不舒服的.

Future

在 Rust 中所谓的异步其实是靠着 trait Future 来实现的, 它的描述如下

A future represents an asynchronous computation.

一个 future 代表是一个异步计算, 通常来讲一个 future 是一个还没有计算完成的值, 而一个线程可以在它计算的同时去做一些其他的事情, 而不是阻塞的等待它完成. 于是这里就涉及到暂停恢复两个问题.

1) 什么时候暂停/该怎么暂停
2) 什么时候恢复执行/该怎么恢复

这有点像进程调度, 使用 N 个 CPU 核心来调度 M 个进程. 同理 Future 就是使用 N 个线程去调度 M 个 Future 对象, 在适当的时候暂停一个 Future 切换到其他的任务, 以及在一个合适的时间切换回去.

而如何实现暂停和恢复就是接下来要讲述的.

Futures 0.1

突然说到 futures 0.1 有点奇怪, 毕竟 0.3 都已经发布了, 但了解下历史也不是什么坏处.

futures 的 0.1 版本, trait Future 大概是这么个定义:

trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}

这除了 poll 函数需要用户自己来实现, 其他的都不需要用户实现, 例如 and_then, map 之类的, 这些更像是工具类的方法.

首先是 poll 的返回类型, 一个 Poll, 他其实是 Result 的一个别名:

type Poll<T, E> = Result<Async<T>, E>;

那么 poll 是什么? 前面说过了一个 Future 是一个异步的计算, 他不应该阻塞线程, 换句话说, poll 函数内的代码应该是执行时间短的, 并且可能会分成很多部分来执行. 它的返回值指示了这个 future 的执行状态, 可以是已经完成, 也可以是还没完成的状态. 这是通过 Async 枚举来完成的, 它有两种类型, ReadyNotReady.

也就是说 poll 方法就是这个异步操作的实现部分, 在 poll 方法中我们将一个可能需要很长时间才能完成的操作放到一个新的线程中执行, 而 poll 根据这个操作是否完成来返回 ReadyNotReady 两种状态.

关于新线程

上一段我提到了 “ 放到一个新的线程中执行 ”,这个说法,其实是不恰当的。更准确的说是,poll 要怎么实现是有多种方式的, 也不一定非要开一个新的子线程, 它可以提交任务到一个线程池, 也可以注册到一个事件循环当中. 这个会在后面 tokio 的部分提到.

poll 方法返回一个 NotReady 的时候, 相当于是告诉调用 poll 的人还没准备好, 你等一会再来调用一次 poll 看看. 这就相当于是一次暂停.

那么恢复呢. “等一会再调用”中的”等一会”究竟得等多久呢?

这时候就需要有一种通知的机制, 当 Future 准备好再次被poll的时候来通知调用者.

听起来有点熟悉, 回调函数是一种方法, 例如 JavaScriptJava 等就是用的这种方法. 但这很容易就会产生回调地狱, 一层套一曾的回调:

Future 的做法是, 提供一个 task::current() 方法. 当你在 poll 函数中调用 task::current() 时, 他返回的 Task 对象就是当前 Future 相关联的. 这个Task在其他语言里面可能有另一个名字, 例如在 kotlin里面它叫做Continuation.

它是怎么做到的?

task::current() 是从一个线程本地存储(thread_local!) 中拿到的当前的 Task 对象的. 听起来不可思议, 别担心, 当我们在后面尝试编写一个 Runtime 的时候就会明白是怎么做到的了.

当我们拿到了 Task 之后, 就可以将这个 task 传递给其他人, 例如一个后台线程. 它有个 notify 方法, 当他被调用的时候就指示这个 Future 准备好了, 请调用者再次尝试 poll 一下来获取 Future 的计算值.

notify 是怎么通知调用者的?

事实上调用者通常不是用户, 而是一个 Runtime. 这是因为所有的Future都会有一个层级, 最顶层的可能是 main 函数, 也可能是在 main 函数里面创建的其他 Future, 而这个Future 最终都会交给 Runtime 来管理. 例如早期的 tokio::run, 和现在的 #[tokio::main]. notify 函数只需要通知这个 Runtime 就好了, 然后Runtime负责把对应的Future对象再次执行一次就好了.

例如有一个 FutureAsyncRead, 代表一个异步读取, 当我们首次调用 poll的时候可能是在 some_io.read(&mut buf) 的时候触发的, 而这个时候IO还没有准备就绪, 没有什么数据可以让我们读,于是我们的 poll 就返回一个 NotReady. 而当IO准备好的时候, 我们就可以通知调用者, hey, 数据准备好了, 你再调用一次 poll, 我会把数据返回给你.

小总结一下

rust 的异步通常是靠一个 Futuretrait 来完成的, Future 需要实现一个 poll 方法, poll 方法需要很快的返回, 因为他不能阻塞当前的线程, 根据这个 Future 的计算进度, poll 方法可以返回 ReadyNotReady 来指示进度. 并且当计算完成时, 通过 Task::notify() 方法来再次调度 Future 的执行.

尝试来写一个 Future

要想理解上面所说的, 最好的办法是我们自己写一个 Future 的实现.

实现一个 Delay, 它什么都不做, 仅仅是在一个固定的时间后完成.

pub struct Delay {
    dur: Duration,
    handle: Option<JoinHandle<()>>,
}

impl Delay {
    pub fn new(dur: Duration) -> Self {
        Delay {
            dur,
            handle: None,
        }
    }
}

impl Future for Delay {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        unimplemented!()
    }
}

就像这样, 因为 Delay 它不计算出任何值, 因此它的Item关联类型是 (). 为了简单我们不考虑 Delay 会出错. 因此Error关联类型也是 ().

根据我们之前的知识, poll 的实现应该是, 第一次 poll 时获取 Task 对象, 并启动一个线程去 sleep, 在 sleep 过后调用一下 notify. 同时 poll 立刻返回 NotReady.

如果你能很自然的想到这个简单的实现方式, 说明你已经理解了Future的基本原理了.

那么可以这样写:

match self.handle {
    None => {
        let task = futures::task::current();
        let dur = self.dur.clone();

        let handle = spawn(move || {
            sleep(dur);

            task.notify();
        });

        self.handle = Some(handle);

        Ok(Async::NotReady)
    }
    Some(_) => {
        Ok(Async::Ready(()))
    }
}

当第一次 poll 时, 启动线程什么也不做, 单纯的 sleep, sleep 完成后调用 notify 调度第二次 poll. 此时进入 match 的第二部分, 返回了 Ready.

可以看出有点像是一个状态机, Future 维持一个状态, 每次 poll 调用发生时, 根据当前的状态执行不同的代码. 事实上这也是大部分协程的一个实现方式, 通常我们叫做生成器 Generator.

Generator

实际上标准库里面的确有生成器这个东西, https://doc.rust-lang.org/std/ops/trait.Generator.html 但没有标准化

来让他跑起来

fn main() {
    let mut list_of_delay = vec![];

    for _ in 0..10 {
        list_of_delay.push(Delay::new(Duration::from_secs(5)))
    }

    let all = join_all(list_of_delay);

    let now = Instant::now();
    all.wait().ok();
    println!("time elapsed: {:?}", now.elapsed());
}

我们创建了十个 Delay 对象, 并且用 join_all 方法创建了一个新的 Future, 这个Future 会在这十个 Delay 完成后才完成. 而这个 waitFuture 提供的众多工具方法之一, 效果就是在当前线程阻塞的等待它的完成.

注意

直到我们调用 wait 方法之前, future 都不会运行! 因为如果 poll 方法都没有被调用过一次, 那我们的实现中的线程也不会被创建, 更别说 “运行” 了.

运行结果:

可以看出尽管我们等待了十个 Delay, 每个 Delay 都是延迟 5s, 但总时间却不是 50s.

编写一个 Runtime (让 Future 跑起来)

之前只是实现了一个 Future, 但还没有讨论如何让他运行起来, 尽管我们用 wait 方法让这个 Future 运行了下. wait 实现原理则是使用一个内置的 Runtime 去运行这个 Future, 并且等待他的完成. 但我们还是不知道他究竟是怎么跑起来的. 除非我们能自己编写一个 Runtime.

编写一个实验性的 Runtime 不是什么难事. (其实还挺麻烦的)

TODO: 实现一个 Runtime.

这部分先暂时不实现了, 略微有点复杂, 等之后有空了再写. 如果实在是好奇怎么实现的, 可以看一下这篇文章 https://stjepang.github.io/2020/01/31/build-your-own-executor.html

Future 0.3

尽管目前为止, futures 0.1 仍然在被广泛使用, 但他将会在不久的将来被 0.3 代替, 所以我们还是来看看 0.3 吧.

0.1 的时候, Future 的定义是让 futures 这个社区库来做的, 并且获取 Task 的方法在我看来是一个非常 hack 风格的做法, 在启用 std 的情况下使用了线程局部存储来实现, 而在 no_std 的情况下则用了其他的方法来实现.

同时 rust 官方和社区都在考虑着将 Future 核心并入到标准库中, 并且这个目标在 1.36 的时候做到了.

当然社区也在考虑对 Future 进行改进, 改进成了现在的形式:

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

将原先的两个关联类型改成了只有一个 Output, 而返回类型 Poll 不再是 Result 的别名, 他现在是一个枚举, 相当于以前的 Async 枚举.

pub enum Poll<T> {
    Ready(T),
    Pending,
}

同时在 futures 里面还定义了一个叫做 TryFuturetrait, 长这样

pub trait TryFuture: Sealed + Future {
    type Ok;
    type Error;
    fn try_poll(
        self: Pin<&mut Self>, 
        cx: &mut Context
    ) -> Poll<Result<Self::Ok, Self::Error>>;
}

其实就是把返回类型固定成了 Result, 而标准库的 Future的返回类型其实并不总是Result的.

我们还注意到 poll 方法新增了一个参数 Context, 这是用来取代旧版的 task::current().

据说在演化早期, 第二个参数原本是 Waker 类型, 而社区的人尤其是 tokio 社区认为, 为了方便以后的扩展, 建议将 Waker 包裹在 Context 类型, 扩展时只需要对 Context 进行修改而不会破坏原有的兼容性. 尽管官方尝试反对这个提议, 但最终还是改成了现在的样子. (事实上直到现在为止, Context 里面依然还是只有 Waker 这一个)

而原先的 self 的类型也被改成了 Pin, 不过 Pin 和本文无关, 也就不讨论了. 关于 Pin的大家可以看看这个视频 https://www.youtube.com/watch?v=DkMwYxfSYNQ ,三个多小时, 有点长.

futures 的其他部分则没有被合并到标准库中.

注意

标准库只提供了 Future 的抽象, 而完全不管 Future 应该怎么运行. 将后者交给社区来实现, 例如 tokio, async-std

来用 0.3 版本的 Future 改写一下之前写的 Delay.

impl Future for Delay {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.handle {
            None => {
                let waker = cx.waker().clone();
                let dur = self.dur.clone();

                let handle = spawn(move || {
                    sleep(dur);
                    waker.wake();
                });
                self.handle = Some(handle);

                Poll::Pending
            }

            Some(_) => {
                Poll::Ready(())
            }
        }
    }
}

0.1 的代码比起来, 需要改动的只是 Future 的实现, 并且注意从 std::future 中导入 trait Future.

与之前的变化不大, 仅仅是从 task::current() 改成了 Context::waker(), 和返回类型的修改.

到这里为止, 不需要再像以前一样依赖于 futures 库了, 但这并不意味着 futures 库真的不需要了.

再来测试一下

fn main() {
    let mut list_of_delay = vec![];
    for _ in 0..10 {
        list_of_delay.push(Delay::new(Duration::from_secs(5)));
    }

    let all = futures_util::future::join_all(list_of_delay);

    let now = Instant::now();
    futures_executor::block_on(all);
    println!("time elapsed: {:?}", now.elapsed());
}

就像之前说的, 标准库并没有提供运行 Future 的方式, 要想运行一个或多个 Future, 我们仍然需要依靠外部依赖提供的 Runtime 或者叫 Executor 来完成. 而这里用到了 futures_executor::block_on 这个, 和之前用的 .wait() 其实是差不多的用处.

注意

在上述的 main函数里用 block_on 是不能执行 tokio 或者 async-std 里面关于IO的任务的. 例如已经有一个 AsyncRead 对象的时候, .read() 返回的 impl Future 是不能直接放到 block_on 里面的.

这是因为它需要在 tokioRuntime里才可以使用. 如果一定要这样做, 确保是在 Runtime 里面调用的 block_on, 或者使用 tokio 提供的 toko::runtime::Runtime::block_on.

编写一个 Runtime

TODO: 编写一个新版本的 Runtime.

async 与 await

之前只是讨论了关于 Future 的一些细节, 并没有提到 asyncawait 两个关键字. 这两个关键字是在 2018 edition 里面标准化的.

async 关键字

先说说 async 关键字, 这个是用来修饰函数、闭包、代码块等用的,用途就是字面意思的,和其他语言的 async 是一样的.

例如在 JavaScript 当中, 我们都知道一个 async 修饰的函数, 它的返回值会变成一个 Promise. 在 rust 当中这个也是类似的, 只不过 Promise 变成了 Future.

async fn delay(dur: Duration) -> () {
    todo!()
}

例如上述代码其实大致上等价于

fn delay(dur: Duraton) -> impl Future<Output=()> {
    todo!()
}

当然 async 不只是修改返回类型这么简单, 他同时还基于函数里面的 await 关键字将原本的函数体修改成了 Generator, 并且包裹在 GenFuture里面, 参考 https://github.com/rust-lang/rust/blob/master/library/core/src/future/mod.rs#L61-L88 这部分的说明. 关于 GenFuture 如果你有注意编译错误的话, 是可以在一大堆输出里面见到这个的. 不过现在好像改成 impl Future 之后就见不到了.

await 关键字

其实 await 关键字一直都没标准化主要是社区在讨论它的语法究竟应该设计成什么, https://github.com/rust-lang/rust/issues/57640 具体的 issue在这里.

社区讨论了很多它的语法, 例如

  • 作为一个方法的 task.await()
  • 作为前置关键字的 await task
  • let 上做改动的 let! result = task
  • 作为后置宏的 task.await!()

总之就是千奇百怪都有, 但这些都不重要, 时间返回到大概2018年的时候, 那个时候的 await 还是一个宏, 当然这个宏是编译器自带的宏, 算是一个黑魔法的东西, 用法大概就是 let result = await!(task) 这样.

而这个 await! 宏它的一个可能实现的伪代码长这样

loop {
    let poll = task.poll();
    match poll {
        Ok(Async::Ready(e)) => {
            break Ok(e)
        }
        Ok(Async::Pending) => {}
        Err(e) => {
            break Err(e)
        }
    }
    yield Async::Pending
}

注意

真正的 await! 的实现可能并不是这样子的!

也就是设置一个循环, 每次调用一次 poll, 根据结果来决定是 yield 还是直接 break 出结果. 这个和我们在文章一开头说到的关于 poll 的事情的差不多的意思.

在最后 1.39 的时候, 终于把 .await 的后置语法给标准化了, 不管它的语法变成了什么样子, 但基本上都是靠生成器和Future trait 来实现的.

TODO: async 和 await 的使用案例以及用 Future 实现组合 Future 的部分.

async IO

终于到了大家喜闻乐见的 IO 的阶段了, 前面花了大把的篇幅来说 Future, await/async, Runtime 等东西, 但并没有和实际应用相关联起来. 也就是究竟用 Future 应该怎么实现异步IO.

在前面说 0.1 的时候, 我举了一个 AsyncRead 的例子, 但没有说 IO 没有准备就绪的时候应该怎么做, 以及 IO 准备就绪之后怎么通知. 要说这个, 先提一个大家都听过的东西, epoll.

epoll 模型很简单, 将你感兴趣的文件描述符注册到 epoll 当中, 然后由一个线程去循环调用 epoll_wait, 等待对应事件的发生, 并通知应用程序哪些文件描述符产生了你感兴趣的事件, 例如可读和可写.

于是放到我们的异步IO的实现里, 可以用同样的原理, 启动一个单独的线程, 维护一个类似 epoll 的设施, 并且循环 poll . 当我们用 AsyncRead 的时候, 在 poll 函数里将对应的文件描述符和 Context::waker() 注册到这个设施里, 然后返回一个 NotReady. 当事件发生时, 由这个设施弹出对应的 Waker, 并且唤醒等待IO的那个 Future.

这就是 mio, tokio库的附属组件, 它的作用就是提供了对非阻塞IO的系统抽象, 说人话就是它封装了 epoll, kqueue, IOCP等系统提供的功能, 使得作为库的使用者可以不用关心跨平台设计, 用统一的接口去管理IO事件.

mio仍然属于一个底层的库, 要使用起来并不容易, 因此 tokio在 0.1 的时候, 提供了一个 reactor, 包装了 mio, 对 TcpStreamUdpStream 等进行了整合, 使得用户不用关心怎么实现的, 只需要确保在当我们调用相关联的方法, 例如 read, write 的时候是在tokioRuntime 的内部就好了. 而在 0.2 里面, reactor 已经直接合并到了 Runtime里面, 通过 enable_io()方法可以启用 (默认用 all 的时候就已经启用了.

TODO: 展示一些相关联的核心代码的实现

结语

TODO:

关于 “Rust – Future 与异步” 的 2 个意见

    1. 哈哈哈, 不好意思, 当时写 executor 的时候遇到了点小困难, 后来就忘记写了… 最近更新的话打算先把这部分跳过, 然后把剩下的部分补全下. 顺便其实之前有很多东西都没有交代清楚, 也打算一块修改下.

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注