执行器与系统 IO

在之前的 Future 特征 中,我们讨论了一个在套接字上进行异步读取的 future 示例:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data -- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

这个 future 将从一个套接字中读取可用数据,当里面无数据时, 它将执行权让给执行器,请求在套接字再次可读时唤醒其任务。 但是,在这个例子中并不能清楚地了解到 Socket 类型是如何实现的, 尤其无法明确得知 set_readable_callback 函数是如何工作的。 一旦套接字就绪(可读),我们如何去安排调用 wake()? 一种选择是创建一个线程去不停地检查 socket 是否已就绪,并在就绪时调用 wake()。 然而,这样做是十分低效的,每个阻塞的 IO future 都需要为一个单独的线程。 这将大大降低我们的异步代码的效率。

在实际上,这个问题是通过集成一个阻塞 IO 感知系统来解决的,例如 Linux 上的 epoll, MacOS 及 FreeBSD 上的 kqueue 、 Windows 上使用的 IOCP,以及 Fuchsia 中的 port( 所有这些已通过 Rust 中跨平台的 crate mio 实现)。 它们原生地支持在一个线程上有多个异步 IO 阻塞事件,一旦其中一个事件完成就返回。 这些 APIs 通常看起来是这样的:

struct IoBlocker {
    /* ... */
}

struct Event {
    // An ID uniquely identifying the event that occurred and was listened for.
    id: usize,

    // A set of signals to wait for, or which occurred.
    signals: Signals,
}

impl IoBlocker {
    /// Create a new collection of asynchronous IO events to block on.
    fn new() -> Self { /* ... */ }

    /// Express an interest in a particular IO event.
    fn add_io_event_interest(
        &self,

        /// The object on which the event will occur
        io_object: &IoObject,

        /// A set of signals that may appear on the `io_object` for
        /// which an event should be triggered, paired with
        /// an ID to give to events that result from this interest.
        event: Event,
    ) { /* ... */ }

    /// Block until one of the events occurs.
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
println!("Socket {:?} is now {:?}", event.id, event.signals);

Futures 执行器可以使用这些原生支持来产生异步 IO 对象,例如可配置套接字, 在特定的事件发生时再去运行回调。在上面的 SocketRead 示例中, Socket::set_readable_callback 的伪代码可以写成这样:

impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` is a reference to the local executor.
        // this could be provided at creation of the socket, but in practice
        // many executor implementations pass it down through thread local
        // storage for convenience.
        let local_executor = self.local_executor;

        // Unique ID for this IO object.
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

我们现在可以只有一个执行器线程,它可以接收任何 IO 事件并将其分派给适当的 Waker, 唤醒相应的任务,使执行器在返回检查更多的 IO 事件之前驱动更多的任务完成(如此循环...)。