Rust – Future 与异步

前言

Rust 1.39 发布已经快一个月了, 而同时 futures 0.3 也已经正式发布, 在之前查看相关资料的时候, 都是在说 Future 怎么使用, 或者是如何实现 Future trait, 很少有看到讲 Future 是怎么运作起来的. 于是今天想来总结一下 rust 异步的一些东西.

Future

在 Rust 中的异步是靠着 trait Future 来实现的, 它的描述如下

A future represents an asynchronous computation.

一个 future 代表是一个异步计算, 通常来讲一个 future 是一个还没有计算完成的值, 而一个线程可以在它计算的同时去做一些其他的事情, 而不是阻塞的等待它完成. 于是这里就涉及到暂停恢复两个问题.

1) 什么时候暂停/该怎么暂停
2) 什么时候恢复执行/该怎么恢复

这有点像进程调度, 使用 N 个 CPU 核心来调度 M 个进程. 同理 Future 就是使用 N 个线程去调度 M 个 Future 对象, 在适当的时候暂停一个 Future 切换到其他的任务, 以及在一个合适的时间切换回去.

而如何实现暂停和恢复就是接下来要讲述的.

Futures 0.1

突然说到 futures 0.1 有点奇怪, 毕竟 0.3 都已经发布了, 但了解下历史也不是什么坏处.

futures 的 0.1 版本, trait Future 大概是这么个定义:

trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}

这除了 poll 函数需要用户自己来实现, 其他的都不需要用户实现, 更像是工具类的方法.

首先是 poll 的返回类型, 一个 Poll, 他其实是 Result 的一个别名:

type Poll<T, E> = Result<Async<T>, E>;

那么 poll 是什么? 前面说过了一个 Future 是一个异步的计算, 他不应该阻塞线程, 换句话说, poll 函数内的代码应该是执行时间短的. 它的返回值指示了这个 future 的执行状态, 可以是已经完成, 也可以是还没完成的状态. 这是通过 Async 枚举来完成的, 它有两种类型, ReadyNotReady.

也就是说 poll 方法就是这个异步操作的实现部分, 在 poll 方法中我们将一个可能需要很长时间才能完成的操作放到一个新的线程中执行, 而 poll 根据这个操作是否完成来返回 ReadyNotReady 两种状态.

poll 方法返回一个 NotReady 的时候, 相当于是告诉调用 poll 的人还没准备好, 你等一会再来调用一次 poll 看看. 这就相当于是一次暂停.

那么恢复呢. “等一会再调用”中的”等一会”究竟得等多久呢?

这时候就需要有一种通知的机制, 当 Future 准备好再次被poll的时候来通知调用者. 听起来有点熟悉, 回调函数是一种方法, 例如 JavaScriptJava 等就是用的这种方法. 但这很容易就会产生回调地狱, 一层套一曾的回调:

Future 的做法是, 提供一个 task::current() 方法. 当你在 poll 函数中调用 task::current() 时, 他返回的 Task 对象就是当前 Future 相关联的.

它是怎么做到的?

task::current() 是从一个线程本地存储(thread_local!) 中拿到的当前的 Task 对象的. 听起来不可思议, 别担心, 当我们在后面尝试编写一个 Runtime 的时候就会明白是怎么做到的了.

当我们拿到了 Task 之后, 就可以将这个 task 传递给其他人, 例如一个后台线程. 它有个 notify 方法, 当他被调用的时候就指示这个 Future 准备好了, 请调用者再次尝试 poll 一下来获取 Future 的计算值.

notify 是怎么通知调用者的?

事实上调用者通常不是用户, 而是一个 Runtime, notify 函数只需要通知这个 Runtime 就好了.

小总结一下

rust 的异步通常是靠一个 Futuretrait 来完成的, Future 需要实现一个 poll 方法, poll 方法需要很快的返回, 因为他不能阻塞当前的线程, 根据这个 Future 的计算进度, poll 方法可以返回 ReadyNotReady 来指示进度. 并且当计算完成时, 通过 Task::notify() 方法来再次调度 Future 的执行.

尝试来写一个 Future

要想理解上面所说的, 最好的办法是我们自己写一个 Future 的实现.

实现一个 Delay, 它什么都不做, 仅仅是在一个固定的时间后完成.

pub struct Delay {
    dur: Duration,
    handle: Option<JoinHandle<()>>,
}

impl Delay {
    pub fn new(dur: Duration) -> Self {
        Delay {
            dur,
            handle: None,
        }
    }
}

impl Future for Delay {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        unimplemented!()
    }
}

就像这样, 因为 Delay 它不计算出任何值, 为了简单我们不考虑 Delay 会出错. 因此两个关联类型都是 ().

根据我们之前的知识, poll 的实现应该是, 第一次 poll 时获取 Task 对象, 并启动一个线程去 sleep, 在 sleep 过后调用一下 notify. 同时 poll 立刻返回 NotReady.

那么可以这样写:

match self.handle {
    None => {
        let task = futures::task::current();
        let dur = self.dur.clone();

        let handle = spawn(move || {
            sleep(dur);

            task.notify();
        });

        self.handle = Some(handle);

        Ok(Async::NotReady)
    }
    Some(_) => {
        Ok(Async::Ready(()))
    }
}

当第一次 poll 时, 启动线程什么也不做, 单纯的 sleep, sleep 完成后调用 notify 调度第二次 poll. 此时进入 match 的第二部分, 返回了 Ready.

可以看出有点像是一个状态机, Future 维持一个状态, 每次 poll 调用发生时, 根据当前的状态执行不同的代码.

来让他跑起来

fn main() {
    let mut list_of_delay = vec![];

    for _ in 0..10 {
        list_of_delay.push(Delay::new(Duration::from_secs(5)))
    }

    let all = join_all(list_of_delay);

    let now = Instant::now();
    all.wait().ok();
    println!("time elapsed: {:?}", now.elapsed());
}

我们创建了十个 Delay 对象, 并且用 join_all 方法创建了一个新的 Future, 这个Future 会在这十个 Delay 完成后才完成. 而这个 waitFuture 提供的众多工具方法之一, 效果就是在当前线程阻塞的等待它的完成.

注意

直到我们调用 wait 方法之前, future 都不会运行! 因为如果 poll 方法都没有被调用过一次, 那我们的实现中的线程也不会被创建, 更别说 “运行” 了.

运行结果:

可以看出尽管我们等待了十个 Delay, 每个 Delay 都是延迟 5s, 但总时间却不是 50s.

编写一个 Runtime (让 Future 跑起来)

之前只是实现了一个 Future, 但还没有讨论如何让他运行起来, 尽管我们用 wait 方法让这个 Future 运行了下. wait 实现原理则是使用一个内置的 Runtime 去运行这个 Future, 并且等待他的完成. 但我们还是不知道他究竟是怎么跑起来的. 除非我们能自己编写一个 Runtime.

编写一个实验性的 Runtime 不是什么难事.

TODO: 实现一个 Runtime.

Future 0.3

尽管目前为止, futures 0.1 仍然在被广泛使用, 但他将会在不久的将来被 0.3 代替, 所以我们还是来看看 0.3 吧.

0.1 的时候, Future 的定义是让 futures 这个社区库来做的, 并且获取 Task 的方法在我看来是一个非常 hack 风格的做法, 在启用 std 的情况下使用了线程局部存储来实现, 而在 no_std 的情况下则用了其他的方法来实现.

同时 rust 官方和社区都在考虑着将 Future 核心并入到标准库中, 并且这个目标在 1.36 的时候做到了.

当然社区也在考虑对 Future 进行改进, 改进成了现在的形式:

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

将原先的两个关联类型改成了只有一个 Output, 而返回类型 Poll 不再是 Result 的别名, 他现在是一个枚举, 相当于以前的 Async 枚举.

pub enum Poll<T> {
    Ready(T),
    Pending,
}

我们还注意到 poll 方法新增了一个参数 Context, 来取代旧版的 task::current().

据说在演化早期, 第二个参数原本是 Waker 类型, 而社区的人尤其是 tokio 社区认为, 为了方便以后的扩展, 建议将 Waker 包裹在 Context 类型, 扩展时只需要对 Context 进行修改而不会破坏原有的兼容性. 尽管官方尝试反对这个提议, 但最终还是改成了现在的样子.

而原先的 self 的类型也被改成了 Pin, 不过 Pin 和本文无关, 也就不讨论了.

futures 的其他部分则没有被合并到标准库中.

注意

标准库只提供了 Future 的抽象, 而完全不管 Future 应该怎么运行. 将后者交给社区来实现, 例如 tokio, async-std

来改写一下之前写的 Delay.

impl Future for Delay {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.handle {
            None => {
                let waker = cx.waker().clone();
                let dur = self.dur.clone();

                let handle = spawn(move || {
                    sleep(dur);
                    waker.wake();
                });
                self.handle = Some(handle);

                Poll::Pending
            }

            Some(_) => {
                Poll::Ready(())
            }
        }
    }
}

0.1 的代码比起来, 需要改动的只是 Future 的实现, 并且注意从 std::future 中导入 trait Future.

与之前的变化不大, 仅仅是从 task::current() 改成了 Context::waker(), 和返回类型的修改.

到这里为止, 不需要再像以前一样依赖于 futures 库了, 但这并不意味着 futures 库真的不需要了.

再来测试一下

fn main() {
    let mut list_of_delay = vec![];
    for _ in 0..10 {
        list_of_delay.push(Delay::new(Duration::from_secs(5)));
    }

    let all = futures_util::future::join_all(list_of_delay);

    let now = Instant::now();
    futures_executor::block_on(all);
    println!("time elapsed: {:?}", now.elapsed());
}

就像之前说的, 标准库并没有提供运行 Future 的方式, 要想运行一个或多个 Future, 我们仍然需要依靠外部依赖提供的 Runtime 或者叫 Executor 来完成.

编写一个 Runtime

TODO: 编写一个新版本的 Runtime.

async 与 await

之前只是讨论了关于 Future 的一些细节, 并没有提到 asyncawait 两个关键字.

TODO:

async IO

TODO:

结语

TODO:

发表评论

电子邮件地址不会被公开。 必填项已用*标注