Future 特征

Future 特征是 Rust 异步编程的核心要义。 Future 是一种可以产生返回值的异步计算(尽管值可能是空,如())。 Future 特征的简化版本可以是这个样子:

#![allow(unused)]
fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
}

通过调用 poll 函数可以推进 Futures,这将驱使 Future 尽快的完成。 当 future 完成时,将返回 Poll::Ready(result)。如果 future 尚不能完成, 它将返回 Poll::Pending,并安排在 future 在可以取得更多的进展时调用 wake() 函数。 当调用 wake() 时,驱动 Future 的执行器会再次调用 Poll, 以便 Future 取得更多的进展。

如果没有 wake(),执行器将无法得知特定的 future 什么时候可以取得进展, 将不得不去轮询每个 future,有了 wake(),执行器就能准确知道哪个 future 准备好被 poll 了。

例如,想像一下我们需要从一个套接字中读取数据,但它里面可能有数据,也可能为空。 如果有数据,我们可以读取并返回 Poll::Ready(data),但如果是空, 我们的 future 将阻塞住、无法取得进展。所以在无数据时我们必须注册一个 wake 以便套接字上数据准备好时进行调用,它将通知执行器读取套接字数据这个 future 已就绪。 一个简单的 SocketRead future 如下:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data -- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

这种 Futures 模型允许将多个异步操作组合起来而无需中间分配。 一次运行多个 futures 或将其链接在一起,可通过无分配状态机实现,如下:

/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed -- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}

这展示了如何在不进行单独分配的情况下同时运行多个 futures, 从而实现更高效的异步程序。同样,多个连续的 futures 也可以顺序地运行,如下:

/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future -- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}

这些例子展示了如何使用 trait 特征,在无需多个分配的对象和深度嵌套的回调情况下, 来表示异步控制流程。抛开基本的控制流程,让我们来谈谈真正的 Future 特征以及它的不同之处。

trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}

首先你会看到的是,self 类型已不再是 &mut Self 而是 Pin<&mut Self>。 我们将在后面的章节中详细讨论 pinning, 但现在你只需知道它允许我们创建不可移动的 futures 即可。 不可移动的对象可以在它们的字段之间存储指针,例如 struct MyFut { a: i32, ptr_to_a: *const i32 }Pinning 是启用 async/awiat 所必需的功能。

其次,wake: fn() 变成了 &mut Context<'_>。在 SimpleFuture 中, 我们通过调用函数指针(fn())来通知 future 执行器来对调用 wake 的 future 进行 Poll 操作。然而,因为 fn() 只是一个函数指针而不包含任何数据,所以你无法得知是哪个 Future 在调用 wake

在实际场景中,像 Web 服务器这样的复杂程序可能有成千上万个不同的连接, 而它们的唤醒工作应该分开来进行管理。Context 类型通过提供对 Waker 类型的值的访问解决了这个问题,该值可用于唤醒特定的任务。