并发地处理连接
目前我们的代码中的问题是 listener.incoming()
是一个阻塞的迭代器。
执行器无法在 listener
等待一个入站连接时,运行其它 futures,
这便导致了我们只有等之前的请求完成,才能处理新的连接。
我们可以通过将 listener.incoming()
从一个阻塞迭代器转变为一个非阻塞流。
流类似于迭代器,但它是可用于异步。
详情可回看Streams。
让我们使用 async-std::net::TcpListener
替代 std::net::TcpListener
,
并更新我们的连接处理函数,让它接受 async_std::net::TcpStream
:
use async_std::prelude::*;
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
//<-- snip -->
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
这个异步版本的 TcpListener
为 listener.incoming()
实现了 Stream
特征,
这带来了两个好处。其一,listener.incoming()
不会再阻塞执行器了。
在入站的 TCP 连接无法取得进展时,它可以允许其它挂起的 futures 去执行。
第二个好处是,可以使用 Stream
的 for_each_concurrent
方法,来并发地处理来自
Stream
的元素。在这里,我们将利用这个方法来并发处理每个传入的请求。
我们需要从 futures
箱中导入 Stream
特征,现在 Cargo.toml 看起来是这样的:
+[dependencies]
+futures = "0.3"
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
现在,我们可以通过闭包函数传入 handle_connection
来并发处理每个连接。
闭包函数将获得每个 TcpStream
的所有权,并在新的 TcpStream
就绪时立即执行。
因为 handle_connection
不再是阻塞的,一个慢请求不会阻止其它请求的完成。
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
handle_connection(tcpstream).await;
})
.await;
}
并行处理请求
到目前为止,我们的示例在很大程度上,将并发(通过异步代码)
作为并行(使用线程)的替代方案。但是,异步代码和线程并非互斥。
在我们的示例中,for_each_concurrent
并发地在同一个进程中处理每个连接。
但 async-std
箱也允许我们去在特定的线程上生成任务。
因为 handle_connection
是可 Send
且是非阻塞的,所以它可以安全地使用
async_std::task::spawn
。代码是这样的:
use async_std::task::spawn; #[async_std::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap(); listener .incoming() .for_each_concurrent(/* limit */ None, |stream| async move { let stream = stream.unwrap(); spawn(handle_connection(stream)); }) .await; }
现在我们可以同时使用并发和并行来同时处理多个连接!详情可查看 多线程执行器。