< 返回版块

2019-06-02 19:41    责任编辑:jay

标签:rust,async

本文转载自: https://zhuanlan.zhihu.com/p/66028983

Rust Async: 标准库futures api解析

赖智超

赖智超

53 人赞同了该文章

Rust对 async/await的支持正在如火如荼的进行中,目前futures api已经稳定了。在rust中使用异步有三个基础的组件:

  1. Future:用户的程序要表达异步逻辑所构造的对象。要么是通过基础的 Future经过各种组合子构造,要么通过 async/await构造;
  2. Executor:用户构造出的 Future最终需要提交到 Executor中执行;
  3. Reactor:在 Future执行过程中无法推进需要等待时,需要将 Executor提供的 Waker注册在 Reactor上, Reactor负责监听 Future是否_Ready_, 如果_Ready_,则通过 Waker通知 Executor继续执行对应的 Future

值得注意的是为了能够稳定地迭代,标准库里只保守地稳定了 FutureWaker相关的api,但这已经足够了:用户以及后续的 async/await特性 可以通过 Future表达异步逻辑,生态库可以通过Waker api构建相应的 ExecutorReactor

下面看看目前在标准库中稳定的api。

Future定义

pub trait Future {
 /// future 结束时产生的结果类型
    type Output;

 /// 返回 `Poll::Pending`表示需要等待
 /// 返回`Poll::Ready(val)`表示异步已完成
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
    Ready(T),
    Pending,
}

// Context结构目前其实就当作Waker用,主要是考虑向前兼容性,以防后面需要增加其他内容。
pub struct Context<'a> {
 Waker: &'a Waker,
    //...
}

Future本身定义比较简单:实现一个poll的方法,参数包含了Executor传递的Waker。如果整个异步完成了,则返回相应的结果,如果需要等待, 则将Context中的 Waker注册到底层的Reactor中。

Future的可组合性

上面提到Future的poll方法在返回Poll::Pending时,需要将Waker注册到Reactor中,也就是说Future和Reactor是耦合在一起的(要么把 Reactor的handle保存在 Future里,要么保存在一个全局变量里面,让poll函数里调用相应的Reactor::register方法注册 Waker),如果每个Future都需要考虑Reactor注册 Waker的事情,那写起来岂不是很麻烦?

好在Future有一个特性就是可以组合,比如现在假定有个 ReadBytesFutFuture:从网络里读取4个byte; 那么我们可以组合衍生出其他 Future,比如 ReadUint32Fut:从网络里读取一个Uint32整数,先通过 ReadBytesFut读出4个byte,再通过小端序转为Uint32;

struct ReadBytesFut{ /*...*/ }
impl Unpin for ReadBytesFut;

struct ReadUint32Fut{
    inner: ReadBytesFut,
}

impl Future for ReadUint32Fut {
    type Output = Result<u32, Error>;

    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let buf = ready!(self.inner.poll_unpin(ctx))?;
        Poll::Ready(little_endian_decode(buf))
    }
}

所以 ReadUint32Fut本身逻辑非常干净,并没有和 Reactor直接交互,而是将锅甩到内部更底层的 Future上。整个异步的task逻辑都是通过类似的方式不断组合嵌套组装起来的,最终构造一个类似于一个树形的结构的task,放进 Executor里执行, Executor则构造好 Waker,从树形结构的根节点调用 poll函数,然后一层层将 Waker向下传递甩锅直到树的叶子节点,而叶子节点就是那些最基础的 Future(比如网络io,定时器等等),由这些基础的 Future负责和 Reactor打交道。 理解了这个之后, futures-rs库中提供的很多 Future的组合子的源码看起来就毫不费力了。

Waker的设计

Waker的api设计是社区当时争论比较激烈的地方,在前一个版本中包含了 Waker, LocalWaker, UnsafeWakeWake等众多概念,非常复杂。其本身的复杂性主要来自于对性能的追求,由于 ExectuorReactor是两个独立的组件,不能假定两者是在同一个线程,因此 Executor提供的 Waker要能够在其他线程调用,也要能够同时被多次注册(比如一边监听网络io,一边监听定时器超时),自然需要使Waker满足 Send+Sync+Clone, 但是如果 ReactorExecutor在同一个线程的话,调用 Waker.wake时,可以走点捷径(比如不需要跨线程间的同步等),提高性能,所以就多出来了个 LocalWaker的概念。

经过各方权衡,标准库最终采纳的api在性能和简洁性上进行折中。

下面就api的设计思路作一些阐述:

首先,Waker是由Executor提供的,不同的 Executor有不同的实现。因此其必然是需要动态派发的能力, traitobject可以做到,不过还需要支持 Send+Sync+Clone就有点问题了:首先 Clone方法不满足 objectsafety,其次要满足 Send+Sync,使用 Arc<dynWake>在内存管理上不够灵活,因为Arc只有在std里有,对于嵌入式环境就没法使用了。所以标准库采用自定义虚函数表的方式进行动态派发。

pub struct Waker {
 Waker: RawWaker,
}

unsafe impl Send for Waker {}
unsafe impl Sync for Waker {}

impl Waker {
    pub fn wake(self){}
    pub fn wake_by_ref(&self) {}
}

impl Clone for Waker {
    fn clone(&self) -> Self { }
}

pub struct RawWaker {
    data: *const (),
    vtable: &'static RawWakerVTable,
}
pub struct RawWakerVTable {
    clone: unsafe fn(*const ()) -> RawWaker,
    wake: unsafe fn(*const ()),
    wake_by_ref: unsafe fn(*const ()),
    drop: unsafe fn(*const ()),
}

Waker结构体提供了 Send+Sync+Clone的实现,给 FutureReactor使用,内部封装 RawWaker,包含了具体数据和一个静态的虚函数表,虚函数表包含了 clonedrop两个函数指针用于自定义的内存管理, wakewake_by_ref用于实现唤醒 Future的功能。

对于为单线程设计的 Executor,其 Waker只能在 Executor所在线程上调用,而 Waker本身又必须要实现 Send+Sync,也就是说从静态编译层面无法避免 Waker发送到其他线程, Waker::wake函数需要在运行时进行判断,因此会有一点额外的开销,官方认为这些开销非常小,但api却得到了很大的简化。

发布于 2019-05-16