前言
Rust 1.39 发布已经快一个月了, 而同时 futures 0.3
也已经正式发布, 在之前查看相关资料的时候, 都是在说 Future
怎么使用, 或者是如何实现 Future trait
, 很少有看到讲 Future
是怎么运作起来的. 于是今天想来总结一下 rust
异步的一些东西.
Future 0.1 vs 0.3
对于刚刚接触Future
的人来说,首先面对的一个问题就是,0.1和0.3版本有啥区别?关于这个问题我之前在知乎某个文章底下评论过,在这里再重复说一遍。
首先大家可以看一下 rfc 里面关于历史的介绍, https://rust-lang.github.io/rfcs/2592-futures.html#historical-context 在这里。
简单的来说就是,早在2016年的时候,社区出了一款库,名字叫 futures
,版本就是0.1,在这里提出了最早的Future
这个概念,以及对应的 trait。在之后2018年的时候, 大家都知 async/await
关键字开始发展, 于是这个时候 futures
团队起草了一系列的 RFC, 对 Future
进行了很多修改, 最终修改成了 0.3 的样子, 并且将这个 trait 合并到了标准库里面.
因此我们从这个提交 https://github.com/rust-lang/futures-rs/commit/33d6ba46a63ebc137edf47818997be5ce41d5616#diff-541deb29dfb9b7c9148c585d00f14398L3 里面可以看到, 从这个时间上, futures-rs
里面对于 Future
trait的依赖已经变成了标准库, 而不是自己定义的. 而这个时候其实就是 0.3 版本了, 当然 0.3 版本拖了相当长的时间才发布.
这意味着如果你想用 Future
, 其实是不需要引入 futures-rs
这个第三方库的. 不过实际上它提供了很多实用工具, 不引入写起来其实是挺不舒服的.
Future
在 Rust 中所谓的异步其实是靠着 trait Future
来实现的, 它的描述如下
A future represents an asynchronous computation.
一个 future
代表是一个异步计算, 通常来讲一个 future
是一个还没有计算完成的值, 而一个线程可以在它计算的同时去做一些其他的事情, 而不是阻塞的等待它完成. 于是这里就涉及到暂停和恢复两个问题.
1) 什么时候暂停/该怎么暂停
2) 什么时候恢复执行/该怎么恢复
这有点像进程调度, 使用 N 个 CPU 核心来调度 M 个进程. 同理 Future
就是使用 N 个线程去调度 M 个 Future
对象, 在适当的时候暂停一个 Future
切换到其他的任务, 以及在一个合适的时间切换回去.
而如何实现暂停和恢复就是接下来要讲述的.
Futures 0.1
突然说到 futures 0.1
有点奇怪, 毕竟 0.3 都已经发布了, 但了解下历史也不是什么坏处.
在 futures
的 0.1 版本, trait Future
大概是这么个定义:
trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
这除了 poll
函数需要用户自己来实现, 其他的都不需要用户实现, 例如 and_then
, map
之类的, 这些更像是工具类的方法.
首先是 poll
的返回类型, 一个 Poll
, 他其实是 Result
的一个别名:
type Poll<T, E> = Result<Async<T>, E>;
那么 poll
是什么? 前面说过了一个 Future
是一个异步的计算, 他不应该阻塞线程, 换句话说, poll
函数内的代码应该是执行时间短的, 并且可能会分成很多部分来执行. 它的返回值指示了这个 future
的执行状态, 可以是已经完成, 也可以是还没完成的状态. 这是通过 Async
枚举来完成的, 它有两种类型, Ready
和 NotReady
.
也就是说 poll
方法就是这个异步操作的实现部分, 在 poll
方法中我们将一个可能需要很长时间才能完成的操作放到一个新的线程中执行, 而 poll
根据这个操作是否完成来返回 Ready
和 NotReady
两种状态.
关于新线程
上一段我提到了 “ 放到一个新的线程中执行 ”,这个说法,其实是不恰当的。更准确的说是,poll
要怎么实现是有多种方式的, 也不一定非要开一个新的子线程, 它可以提交任务到一个线程池, 也可以注册到一个事件循环当中. 这个会在后面 tokio
的部分提到.
当 poll
方法返回一个 NotReady
的时候, 相当于是告诉调用 poll
的人还没准备好, 你等一会再来调用一次 poll
看看. 这就相当于是一次暂停.
那么恢复呢. “等一会再调用”中的”等一会”究竟得等多久呢?
这时候就需要有一种通知的机制, 当 Future
准备好再次被poll
的时候来通知调用者.
听起来有点熟悉, 回调函数是一种方法, 例如 JavaScript
和 Java
等就是用的这种方法. 但这很容易就会产生回调地狱, 一层套一曾的回调:
而 Future
的做法是, 提供一个 task::current()
方法. 当你在 poll
函数中调用 task::current()
时, 他返回的 Task
对象就是当前 Future
相关联的. 这个Task
在其他语言里面可能有另一个名字, 例如在 kotlin
里面它叫做Continuation
.
它是怎么做到的?
task::current()
是从一个线程本地存储(thread_local!
) 中拿到的当前的 Task
对象的. 听起来不可思议, 别担心, 当我们在后面尝试编写一个 Runtime
的时候就会明白是怎么做到的了.
当我们拿到了 Task
之后, 就可以将这个 task
传递给其他人, 例如一个后台线程. 它有个 notify
方法, 当他被调用的时候就指示这个 Future
准备好了, 请调用者再次尝试 poll
一下来获取 Future
的计算值.
notify 是怎么通知调用者的?
事实上调用者通常不是用户, 而是一个 Runtime
. 这是因为所有的Future
都会有一个层级, 最顶层的可能是 main
函数, 也可能是在 main
函数里面创建的其他 Future
, 而这个Future
最终都会交给 Runtime
来管理. 例如早期的 tokio::run
, 和现在的 #[tokio::main]
. notify
函数只需要通知这个 Runtime
就好了, 然后Runtime
负责把对应的Future
对象再次执行一次就好了.
例如有一个 Future
是 AsyncRead
, 代表一个异步读取, 当我们首次调用 poll
的时候可能是在 some_io.read(&mut buf)
的时候触发的, 而这个时候IO
还没有准备就绪, 没有什么数据可以让我们读,于是我们的 poll
就返回一个 NotReady
. 而当IO
准备好的时候, 我们就可以通知调用者, hey, 数据准备好了, 你再调用一次 poll
, 我会把数据返回给你.
小总结一下
rust 的异步通常是靠一个 Future
的 trait
来完成的, Future
需要实现一个 poll
方法, poll
方法需要很快的返回, 因为他不能阻塞当前的线程, 根据这个 Future
的计算进度, poll
方法可以返回 Ready
和 NotReady
来指示进度. 并且当计算完成时, 通过 Task::notify()
方法来再次调度 Future
的执行.
尝试来写一个 Future
要想理解上面所说的, 最好的办法是我们自己写一个 Future
的实现.
实现一个 Delay
, 它什么都不做, 仅仅是在一个固定的时间后完成.
pub struct Delay {
dur: Duration,
handle: Option<JoinHandle<()>>,
}
impl Delay {
pub fn new(dur: Duration) -> Self {
Delay {
dur,
handle: None,
}
}
}
impl Future for Delay {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
unimplemented!()
}
}
就像这样, 因为 Delay
它不计算出任何值, 因此它的Item
关联类型是 ()
. 为了简单我们不考虑 Delay
会出错. 因此Error
关联类型也是 ()
.
根据我们之前的知识, poll
的实现应该是, 第一次 poll
时获取 Task
对象, 并启动一个线程去 sleep, 在 sleep 过后调用一下 notify
. 同时 poll
立刻返回 NotReady
.
如果你能很自然的想到这个简单的实现方式, 说明你已经理解了Future
的基本原理了.
那么可以这样写:
match self.handle {
None => {
let task = futures::task::current();
let dur = self.dur.clone();
let handle = spawn(move || {
sleep(dur);
task.notify();
});
self.handle = Some(handle);
Ok(Async::NotReady)
}
Some(_) => {
Ok(Async::Ready(()))
}
}
当第一次 poll
时, 启动线程什么也不做, 单纯的 sleep
, sleep
完成后调用 notify
调度第二次 poll
. 此时进入 match
的第二部分, 返回了 Ready
.
可以看出有点像是一个状态机, Future
维持一个状态, 每次 poll
调用发生时, 根据当前的状态执行不同的代码. 事实上这也是大部分协程的一个实现方式, 通常我们叫做生成器 Generator
.
Generator
实际上标准库里面的确有生成器这个东西, https://doc.rust-lang.org/std/ops/trait.Generator.html 但没有标准化
来让他跑起来
fn main() {
let mut list_of_delay = vec![];
for _ in 0..10 {
list_of_delay.push(Delay::new(Duration::from_secs(5)))
}
let all = join_all(list_of_delay);
let now = Instant::now();
all.wait().ok();
println!("time elapsed: {:?}", now.elapsed());
}
我们创建了十个 Delay
对象, 并且用 join_all
方法创建了一个新的 Future
, 这个Future
会在这十个 Delay
完成后才完成. 而这个 wait
是 Future
提供的众多工具方法之一, 效果就是在当前线程阻塞的等待它的完成.
注意
直到我们调用 wait
方法之前, future
都不会运行! 因为如果 poll
方法都没有被调用过一次, 那我们的实现中的线程也不会被创建, 更别说 “运行” 了.
运行结果:
可以看出尽管我们等待了十个 Delay
, 每个 Delay
都是延迟 5s, 但总时间却不是 50s.
编写一个 Runtime (让 Future 跑起来)
之前只是实现了一个 Future
, 但还没有讨论如何让他运行起来, 尽管我们用 wait
方法让这个 Future
运行了下. wait
实现原理则是使用一个内置的 Runtime
去运行这个 Future
, 并且等待他的完成. 但我们还是不知道他究竟是怎么跑起来的. 除非我们能自己编写一个 Runtime
.
编写一个实验性的 Runtime
不是什么难事. (其实还挺麻烦的)
TODO: 实现一个 Runtime
.
这部分先暂时不实现了, 略微有点复杂, 等之后有空了再写. 如果实在是好奇怎么实现的, 可以看一下这篇文章 https://stjepang.github.io/2020/01/31/build-your-own-executor.html
Future 0.3
尽管目前为止, futures 0.1
仍然在被广泛使用, 但他将会在不久的将来被 0.3
代替, 所以我们还是来看看 0.3
吧.
在 0.1
的时候, Future
的定义是让 futures
这个社区库来做的, 并且获取 Task
的方法在我看来是一个非常 hack
风格的做法, 在启用 std
的情况下使用了线程局部存储来实现, 而在 no_std
的情况下则用了其他的方法来实现.
同时 rust
官方和社区都在考虑着将 Future
核心并入到标准库中, 并且这个目标在 1.36 的时候做到了.
当然社区也在考虑对 Future
进行改进, 改进成了现在的形式:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
将原先的两个关联类型改成了只有一个 Output
, 而返回类型 Poll
不再是 Result
的别名, 他现在是一个枚举, 相当于以前的 Async
枚举.
pub enum Poll<T> {
Ready(T),
Pending,
}
同时在 futures
里面还定义了一个叫做 TryFuture
的 trait
, 长这样
pub trait TryFuture: Sealed + Future {
type Ok;
type Error;
fn try_poll(
self: Pin<&mut Self>,
cx: &mut Context
) -> Poll<Result<Self::Ok, Self::Error>>;
}
其实就是把返回类型固定成了 Result
, 而标准库的 Future
的返回类型其实并不总是Result
的.
我们还注意到 poll
方法新增了一个参数 Context
, 这是用来取代旧版的 task::current()
.
据说在演化早期, 第二个参数原本是 Waker
类型, 而社区的人尤其是 tokio
社区认为, 为了方便以后的扩展, 建议将 Waker
包裹在 Context
类型, 扩展时只需要对 Context
进行修改而不会破坏原有的兼容性. 尽管官方尝试反对这个提议, 但最终还是改成了现在的样子. (事实上直到现在为止, Context
里面依然还是只有 Waker
这一个)
而原先的 self
的类型也被改成了 Pin
, 不过 Pin
和本文无关, 也就不讨论了. 关于 Pin
的大家可以看看这个视频 https://www.youtube.com/watch?v=DkMwYxfSYNQ ,三个多小时, 有点长.
而 futures
的其他部分则没有被合并到标准库中.
注意
标准库只提供了 Future
的抽象, 而完全不管 Future
应该怎么运行. 将后者交给社区来实现, 例如 tokio
, async-std
等
来用 0.3 版本的 Future
改写一下之前写的 Delay
.
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.handle {
None => {
let waker = cx.waker().clone();
let dur = self.dur.clone();
let handle = spawn(move || {
sleep(dur);
waker.wake();
});
self.handle = Some(handle);
Poll::Pending
}
Some(_) => {
Poll::Ready(())
}
}
}
}
和 0.1
的代码比起来, 需要改动的只是 Future
的实现, 并且注意从 std::future
中导入 trait Future
.
与之前的变化不大, 仅仅是从 task::current()
改成了 Context::waker()
, 和返回类型的修改.
到这里为止, 不需要再像以前一样依赖于 futures
库了, 但这并不意味着 futures
库真的不需要了.
再来测试一下
fn main() {
let mut list_of_delay = vec![];
for _ in 0..10 {
list_of_delay.push(Delay::new(Duration::from_secs(5)));
}
let all = futures_util::future::join_all(list_of_delay);
let now = Instant::now();
futures_executor::block_on(all);
println!("time elapsed: {:?}", now.elapsed());
}
就像之前说的, 标准库并没有提供运行 Future
的方式, 要想运行一个或多个 Future
, 我们仍然需要依靠外部依赖提供的 Runtime
或者叫 Executor
来完成. 而这里用到了 futures_executor::block_on
这个, 和之前用的 .wait()
其实是差不多的用处.
注意
在上述的 main
函数里用 block_on
是不能执行 tokio
或者 async-std
里面关于IO
的任务的. 例如已经有一个 AsyncRead
对象的时候, .read()
返回的 impl Future
是不能直接放到 block_on
里面的.
这是因为它需要在 tokio
的Runtime
里才可以使用. 如果一定要这样做, 确保是在 Runtime
里面调用的 block_on
, 或者使用 tokio
提供的 toko::runtime::Runtime::block_on
.
编写一个 Runtime
TODO: 编写一个新版本的 Runtime.
async 与 await
之前只是讨论了关于 Future
的一些细节, 并没有提到 async
和 await
两个关键字. 这两个关键字是在 2018 edition 里面标准化的.
async 关键字
先说说 async
关键字, 这个是用来修饰函数、闭包、代码块等用的,用途就是字面意思的,和其他语言的 async
是一样的.
例如在 JavaScript
当中, 我们都知道一个 async
修饰的函数, 它的返回值会变成一个 Promise
. 在 rust
当中这个也是类似的, 只不过 Promise
变成了 Future
.
async fn delay(dur: Duration) -> () {
todo!()
}
例如上述代码其实大致上等价于
fn delay(dur: Duraton) -> impl Future<Output=()> {
todo!()
}
当然 async
不只是修改返回类型这么简单, 他同时还基于函数里面的 await
关键字将原本的函数体修改成了 Generator
, 并且包裹在 GenFuture
里面, 参考 https://github.com/rust-lang/rust/blob/master/library/core/src/future/mod.rs#L61-L88 这部分的说明. 关于 GenFuture
如果你有注意编译错误的话, 是可以在一大堆输出里面见到这个的. 不过现在好像改成 impl Future
之后就见不到了.
await 关键字
其实 await
关键字一直都没标准化主要是社区在讨论它的语法究竟应该设计成什么, https://github.com/rust-lang/rust/issues/57640 具体的 issue在这里.
社区讨论了很多它的语法, 例如
- 作为一个方法的
task.await()
- 作为前置关键字的
await task
- 在
let
上做改动的let! result = task
- 作为后置宏的
task.await!()
总之就是千奇百怪都有, 但这些都不重要, 时间返回到大概2018年的时候, 那个时候的 await
还是一个宏, 当然这个宏是编译器自带的宏, 算是一个黑魔法的东西, 用法大概就是 let result = await!(task)
这样.
而这个 await!
宏它的一个可能实现的伪代码长这样
loop {
let poll = task.poll();
match poll {
Ok(Async::Ready(e)) => {
break Ok(e)
}
Ok(Async::Pending) => {}
Err(e) => {
break Err(e)
}
}
yield Async::Pending
}
注意
真正的 await!
的实现可能并不是这样子的!
也就是设置一个循环, 每次调用一次 poll
, 根据结果来决定是 yield
还是直接 break
出结果. 这个和我们在文章一开头说到的关于 poll
的事情的差不多的意思.
在最后 1.39 的时候, 终于把 .await
的后置语法给标准化了, 不管它的语法变成了什么样子, 但基本上都是靠生成器和Future
trait 来实现的.
TODO: async 和 await 的使用案例以及用 Future 实现组合 Future 的部分.
async IO
终于到了大家喜闻乐见的 IO
的阶段了, 前面花了大把的篇幅来说 Future
, await/async
, Runtime
等东西, 但并没有和实际应用相关联起来. 也就是究竟用 Future
应该怎么实现异步IO.
在前面说 0.1 的时候, 我举了一个 AsyncRead
的例子, 但没有说 IO
没有准备就绪的时候应该怎么做, 以及 IO
准备就绪之后怎么通知. 要说这个, 先提一个大家都听过的东西, epoll
.
epoll
模型很简单, 将你感兴趣的文件描述符注册到 epoll
当中, 然后由一个线程去循环调用 epoll_wait
, 等待对应事件的发生, 并通知应用程序哪些文件描述符产生了你感兴趣的事件, 例如可读和可写.
于是放到我们的异步IO
的实现里, 可以用同样的原理, 启动一个单独的线程, 维护一个类似 epoll
的设施, 并且循环 poll
. 当我们用 AsyncRead
的时候, 在 poll
函数里将对应的文件描述符和 Context::waker()
注册到这个设施里, 然后返回一个 NotReady
. 当事件发生时, 由这个设施弹出对应的 Waker
, 并且唤醒等待IO
的那个 Future
.
这就是 mio
, tokio
库的附属组件, 它的作用就是提供了对非阻塞IO
的系统抽象, 说人话就是它封装了 epoll
, kqueue
, IOCP
等系统提供的功能, 使得作为库的使用者可以不用关心跨平台设计, 用统一的接口去管理IO
事件.
mio
仍然属于一个底层的库, 要使用起来并不容易, 因此 tokio
在 0.1 的时候, 提供了一个 reactor
, 包装了 mio
, 对 TcpStream
和 UdpStream
等进行了整合, 使得用户不用关心怎么实现的, 只需要确保在当我们调用相关联的方法, 例如 read
, write
的时候是在tokio
的Runtime
的内部就好了. 而在 0.2 里面, reactor
已经直接合并到了 Runtime
里面, 通过 enable_io()
方法可以启用 (默认用 all 的时候就已经启用了.
这么好的文章怎么断更了,个人感觉一些地方比async book还要清晰易懂!
期待完结!
哈哈哈, 不好意思, 当时写 executor 的时候遇到了点小困难, 后来就忘记写了… 最近更新的话打算先把这部分跳过, 然后把剩下的部分补全下. 顺便其实之前有很多东西都没有交代清楚, 也打算一块修改下.