应用:构建一个执行器
Rust 的 Future
s 是懒惰的:除非积极地推动它完成,不然它不会做任何事情。
一种推动 future 完成的方式是在 async
函数中使用 .await
,
但这只是将问题推进了一层,还面临着:谁将运行从顶级 async
函数里返回的 future?
很明显我们需要一个 Future
执行器。
Future
执行器获取一组顶级 Future
s 并在 Future
可取得进展时通过调用 poll
来将它们运行直至完成。
通常,执行器会调用一次 poll
来使 future 开始运行。
当 Future
s 通过调用 wake()
表示它们已就绪时,会被再次放入队列中以便 poll
再次调用,重复直到 Future
完成。
在本章中,我们将编写一个简单的,能够同时运行大量顶级 futures 并驱使其完成的执行器。
在这个例子中,我们依赖于 futures
箱,它提供了 ArcWake
特征,
有了这个特征,我们可以很方便的构建一个 Waker
。编辑 Cargo.toml
添加依赖:
[package]
name = "timer_future"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"
[dependencies]
futures = "0.3"
接下来,我们需要在 src/main.rs
的顶部导入以下路径:
use futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
};
use std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
};
// The timer we wrote in the previous section:
use timer_future::TimerFuture;
我们将通过将任务发送到通道(channel)上,来使执行器运行它们。 执行器会从通道道中取出事件并运行它。当一个任务已就绪(awoken 状态), 它可以通过通过将自己再次放入通道以便被再次轮询到。
在这个设计中,执行器本身只需要拥有任务通道的接收端。 用户则拥有此通道的发送端,以便生成新的 futures。任务本身只是可以自我重新调度的 futures,所以我们将它和发送端绑定成一对儿,它可以此重新回到任务队列中。
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
/// In-progress future that should be pushed to completion.
///
/// The `Mutex` is not necessary for correctness, since we only have
/// one thread executing tasks at once. However, Rust isn't smart
/// enough to know that `future` is only mutated from one thread,
/// so we need to use the `Mutex` to prove thread-safety. A production
/// executor would not need this, and could use `UnsafeCell` instead.
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// Handle to place the task itself back onto the task queue.
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once.
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor.
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
同时,让我们也给 spawner 添加一个新方法,使它可以方便地生成新的 futures。
这个方法将接收一个 future 类型,放入智能指针 box 中,并在创建一个新的 Arc<Task>
以便它可以添加到执行器的队列中。
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}
我们需要创建一个 Waker
来轮询 futures。之前在 唤醒任务
中提到过,一旦任务的 wake
被调用,Waker
就会安排再次轮询它。请记住,
Waker
会准确的告知执行器哪个任务已就绪,这样就会只轮询已就绪的 futures。
创建一个 Waker
最简单的方法,就是实现 ArcWake
特征,之后使用 waker_ref
或 .into_waker
方法来将一个 Arc<impl ArcWake>
转化成 Waker
。
下面让我们为 Task 实现 ArcWake
以便将它们转化成可唤醒的 Waker
s。
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Implement `wake` by sending this task back onto the task channel
// so that it will be polled again by the executor.
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks queued");
}
}
当从 Arc<Task>
创建 Waker
后,调用其 wake()
将拷贝一份 Arc
并将之发送到任务通道。之后执行器会取得这个任务并轮询它。让我们来实现它:
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it.
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// Create a `LocalWaker` from the task itself
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&waker);
// `BoxFuture<T>` is a type alias for
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
// We can get a `Pin<&mut dyn Future + Send + 'static>`
// from it by calling the `Pin::as_mut` method.
if future.as_mut().poll(context).is_pending() {
// We're not done processing the future, so put it
// back in its task to be run again in the future.
*future_slot = Some(future);
}
}
}
}
}
恭喜!现在我们就有了一个可工作的 futures 执行器。
我们甚至可以使用它去运行 async/.await
代码和自定义的 futures,
比如说之前完成的 TimerFuture
。
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// Spawn a task to print before and after waiting on a timer.
spawner.spawn(async {
println!("howdy!");
// Wait for our timer future to complete after two seconds.
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// Drop the spawner so that our executor knows it is finished and won't
// receive more incoming tasks to run.
drop(spawner);
// Run the executor until the task queue is empty.
// This will print "howdy!", pause, and then print "done!".
executor.run();
}