前言
Rust 1.39 发布已经快一个月了, 而同时 futures 0.3
也已经正式发布, 在之前查看相关资料的时候, 都是在说 Future
怎么使用, 或者是如何实现 Future trait
, 很少有看到讲 Future
是怎么运作起来的. 于是今天想来总结一下 rust
异步的一些东西.
Future
在 Rust 中的异步是靠着 trait Future
来实现的, 它的描述如下
A future represents an asynchronous computation.
一个 future
代表是一个异步计算, 通常来讲一个 future
是一个还没有计算完成的值, 而一个线程可以在它计算的同时去做一些其他的事情, 而不是阻塞的等待它完成. 于是这里就涉及到暂停和恢复两个问题.
1) 什么时候暂停/该怎么暂停
2) 什么时候恢复执行/该怎么恢复
这有点像进程调度, 使用 N 个 CPU 核心来调度 M 个进程. 同理 Future
就是使用 N 个线程去调度 M 个 Future
对象, 在适当的时候暂停一个 Future
切换到其他的任务, 以及在一个合适的时间切换回去.
而如何实现暂停和恢复就是接下来要讲述的.
Futures 0.1
突然说到 futures 0.1
有点奇怪, 毕竟 0.3 都已经发布了, 但了解下历史也不是什么坏处.
在 futures
的 0.1 版本, trait Future
大概是这么个定义:
trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
这除了 poll
函数需要用户自己来实现, 其他的都不需要用户实现, 更像是工具类的方法.
首先是 poll
的返回类型, 一个 Poll
, 他其实是 Result
的一个别名:
type Poll<T, E> = Result<Async<T>, E>;
那么 poll
是什么? 前面说过了一个 Future
是一个异步的计算, 他不应该阻塞线程, 换句话说, poll
函数内的代码应该是执行时间短的. 它的返回值指示了这个 future
的执行状态, 可以是已经完成, 也可以是还没完成的状态. 这是通过 Async
枚举来完成的, 它有两种类型, Ready
和 NotReady
.
也就是说 poll
方法就是这个异步操作的实现部分, 在 poll
方法中我们将一个可能需要很长时间才能完成的操作放到一个新的线程中执行, 而 poll
根据这个操作是否完成来返回 Ready
和 NotReady
两种状态.
当 poll
方法返回一个 NotReady
的时候, 相当于是告诉调用 poll
的人还没准备好, 你等一会再来调用一次 poll
看看. 这就相当于是一次暂停.
那么恢复呢. “等一会再调用”中的”等一会”究竟得等多久呢?
这时候就需要有一种通知的机制, 当 Future
准备好再次被poll
的时候来通知调用者. 听起来有点熟悉, 回调函数是一种方法, 例如 JavaScript
和 Java
等就是用的这种方法. 但这很容易就会产生回调地狱, 一层套一曾的回调:

而 Future
的做法是, 提供一个 task::current()
方法. 当你在 poll
函数中调用 task::current()
时, 他返回的 Task
对象就是当前 Future
相关联的.
它是怎么做到的?
task::current()
是从一个线程本地存储(thread_local!
) 中拿到的当前的 Task
对象的. 听起来不可思议, 别担心, 当我们在后面尝试编写一个 Runtime
的时候就会明白是怎么做到的了.
当我们拿到了 Task
之后, 就可以将这个 task
传递给其他人, 例如一个后台线程. 它有个 notify
方法, 当他被调用的时候就指示这个 Future
准备好了, 请调用者再次尝试 poll
一下来获取 Future
的计算值.
notify 是怎么通知调用者的?
事实上调用者通常不是用户, 而是一个 Runtime
, notify
函数只需要通知这个 Runtime
就好了.
小总结一下
rust 的异步通常是靠一个 Future
的 trait
来完成的, Future
需要实现一个 poll
方法, poll
方法需要很快的返回, 因为他不能阻塞当前的线程, 根据这个 Future
的计算进度, poll
方法可以返回 Ready
和 NotReady
来指示进度. 并且当计算完成时, 通过 Task::notify()
方法来再次调度 Future
的执行.
尝试来写一个 Future
要想理解上面所说的, 最好的办法是我们自己写一个 Future
的实现.
实现一个 Delay
, 它什么都不做, 仅仅是在一个固定的时间后完成.
pub struct Delay {
dur: Duration,
handle: Option<JoinHandle<()>>,
}
impl Delay {
pub fn new(dur: Duration) -> Self {
Delay {
dur,
handle: None,
}
}
}
impl Future for Delay {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
unimplemented!()
}
}
就像这样, 因为 Delay
它不计算出任何值, 为了简单我们不考虑 Delay
会出错. 因此两个关联类型都是 ()
.
根据我们之前的知识, poll
的实现应该是, 第一次 poll
时获取 Task
对象, 并启动一个线程去 sleep, 在 sleep 过后调用一下 notify
. 同时 poll
立刻返回 NotReady
.
那么可以这样写:
match self.handle {
None => {
let task = futures::task::current();
let dur = self.dur.clone();
let handle = spawn(move || {
sleep(dur);
task.notify();
});
self.handle = Some(handle);
Ok(Async::NotReady)
}
Some(_) => {
Ok(Async::Ready(()))
}
}
当第一次 poll
时, 启动线程什么也不做, 单纯的 sleep
, sleep
完成后调用 notify
调度第二次 poll
. 此时进入 match
的第二部分, 返回了 Ready
.
可以看出有点像是一个状态机, Future
维持一个状态, 每次 poll
调用发生时, 根据当前的状态执行不同的代码.
来让他跑起来
fn main() {
let mut list_of_delay = vec![];
for _ in 0..10 {
list_of_delay.push(Delay::new(Duration::from_secs(5)))
}
let all = join_all(list_of_delay);
let now = Instant::now();
all.wait().ok();
println!("time elapsed: {:?}", now.elapsed());
}
我们创建了十个 Delay
对象, 并且用 join_all
方法创建了一个新的 Future
, 这个Future
会在这十个 Delay
完成后才完成. 而这个 wait
是 Future
提供的众多工具方法之一, 效果就是在当前线程阻塞的等待它的完成.
注意
直到我们调用 wait
方法之前, future
都不会运行! 因为如果 poll
方法都没有被调用过一次, 那我们的实现中的线程也不会被创建, 更别说 “运行” 了.
运行结果:

可以看出尽管我们等待了十个 Delay
, 每个 Delay
都是延迟 5s, 但总时间却不是 50s.
编写一个 Runtime (让 Future 跑起来)
之前只是实现了一个 Future
, 但还没有讨论如何让他运行起来, 尽管我们用 wait
方法让这个 Future
运行了下. wait
实现原理则是使用一个内置的 Runtime
去运行这个 Future
, 并且等待他的完成. 但我们还是不知道他究竟是怎么跑起来的. 除非我们能自己编写一个 Runtime
.
编写一个实验性的 Runtime
不是什么难事.
TODO: 实现一个 Runtime
.
Future 0.3
尽管目前为止, futures 0.1
仍然在被广泛使用, 但他将会在不久的将来被 0.3
代替, 所以我们还是来看看 0.3
吧.
在 0.1
的时候, Future
的定义是让 futures
这个社区库来做的, 并且获取 Task
的方法在我看来是一个非常 hack
风格的做法, 在启用 std
的情况下使用了线程局部存储来实现, 而在 no_std
的情况下则用了其他的方法来实现.
同时 rust
官方和社区都在考虑着将 Future
核心并入到标准库中, 并且这个目标在 1.36 的时候做到了.
当然社区也在考虑对 Future
进行改进, 改进成了现在的形式:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
将原先的两个关联类型改成了只有一个 Output
, 而返回类型 Poll
不再是 Result
的别名, 他现在是一个枚举, 相当于以前的 Async
枚举.
pub enum Poll<T> {
Ready(T),
Pending,
}
我们还注意到 poll
方法新增了一个参数 Context
, 来取代旧版的 task::current()
.
据说在演化早期, 第二个参数原本是 Waker
类型, 而社区的人尤其是 tokio
社区认为, 为了方便以后的扩展, 建议将 Waker
包裹在 Context
类型, 扩展时只需要对 Context
进行修改而不会破坏原有的兼容性. 尽管官方尝试反对这个提议, 但最终还是改成了现在的样子.
而原先的 self
的类型也被改成了 Pin
, 不过 Pin
和本文无关, 也就不讨论了.
而 futures
的其他部分则没有被合并到标准库中.
注意
标准库只提供了 Future
的抽象, 而完全不管 Future
应该怎么运行. 将后者交给社区来实现, 例如 tokio
, async-std
等
来改写一下之前写的 Delay
.
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.handle {
None => {
let waker = cx.waker().clone();
let dur = self.dur.clone();
let handle = spawn(move || {
sleep(dur);
waker.wake();
});
self.handle = Some(handle);
Poll::Pending
}
Some(_) => {
Poll::Ready(())
}
}
}
}
和 0.1
的代码比起来, 需要改动的只是 Future
的实现, 并且注意从 std::future
中导入 trait Future
.
与之前的变化不大, 仅仅是从 task::current()
改成了 Context::waker()
, 和返回类型的修改.
到这里为止, 不需要再像以前一样依赖于 futures
库了, 但这并不意味着 futures
库真的不需要了.
再来测试一下
fn main() {
let mut list_of_delay = vec![];
for _ in 0..10 {
list_of_delay.push(Delay::new(Duration::from_secs(5)));
}
let all = futures_util::future::join_all(list_of_delay);
let now = Instant::now();
futures_executor::block_on(all);
println!("time elapsed: {:?}", now.elapsed());
}
就像之前说的, 标准库并没有提供运行 Future
的方式, 要想运行一个或多个 Future
, 我们仍然需要依靠外部依赖提供的 Runtime
或者叫 Executor
来完成.
编写一个 Runtime
TODO: 编写一个新版本的 Runtime.
async 与 await
之前只是讨论了关于 Future
的一些细节, 并没有提到 async
和 await
两个关键字.