通过 Waker
唤醒任务
futures 在第一次被 poll
时是未就就绪状态是很常见的。当出现这种情况时,
futures 需要确保在其就绪后即会被再次轮询。而这是通过 Waker
类型实现的。
每次轮询 future 时,都会将其作为“任务”的一部分进行轮询。 任务是已交由执行器控制的顶级的 future。
Waker
提供了 wake()
方法来告知执行器相关任务需要被唤醒。当调用 wake()
时,
执行器就知道其关联的任务已就绪,并再次轮询那个 future。
Waker
还实现了 clone()
,以便复制和存储。
现在让我们尝试去使用 Waker
来实现一个简单的计时器 future 吧。
应用:构建一个计时器
就此示例而言,我们将在创建计时器时启动一个新线程,并让它休眠一定的时间, 然后在时间窗口结束时给计时器 future 发信号。
首先我们通过 cargo new --lib timer_future
来创建项目并在 src/lib.rs
中添加需导入的功能。
#![allow(unused)] fn main() { use std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }; }
让我们首先定义这个 future 类型。
此 future 需要一种方法去通知线程计时器已完成且自身已就绪。
我们将使用 Arc<Mutex<..>>
共享值来在线程和 future 之间进行通信。
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// Shared state between the future and the waiting thread
struct SharedState {
/// Whether or not the sleep time has elapsed
completed: bool,
/// The waker for the task that `TimerFuture` is running on.
/// The thread can use this after setting `completed = true` to tell
/// `TimerFuture`'s task to wake up, see that `completed = true`, and
/// move forward.
waker: Option<Waker>,
}
那么现在,让我们开始编写代码来实现 Future
!
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Look at the shared state to see if the timer has already completed.
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// Set waker so that the thread can wake up the current task
// when the timer has completed, ensuring that the future is polled
// again and sees that `completed = true`.
//
// It's tempting to do this once rather than repeatedly cloning
// the waker each time. However, the `TimerFuture` can move between
// tasks on the executor, which could cause a stale waker pointing
// to the wrong task, preventing `TimerFuture` from waking up
// correctly.
//
// N.B. it's possible to check for this using the `Waker::will_wake`
// function, but we omit that here to keep things simple.
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
非常简单,是吧?当这个线程的设置变为 shared_state.completed = true
,就完成了!
否则,我们将克隆当前任务的 Waker
并把它放置在 shared_state.waker
中,
以便线程可再次唤醒任务。
我们必须在每次轮询完 future 后更新 Waker
,
因为 future 可能被转移到不同的任务中使用不同的 Waker
了,这点非常重要。
在 futures 被轮询后在任务间传递时,就会发生这种情况。
最后,我们需要一个 API 来实际上构建计时器并启动线程:
impl TimerFuture {
/// Create a new `TimerFuture` which will complete after the provided
/// timeout.
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn the new thread
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// Signal that the timer has completed and wake up the last
// task on which the future was polled, if one exists.
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
哈!以上便是我们构建一个简单的计时器 future 所需的全部组件。 现在,我们只需要一个执行器来运行它了...