并发地处理连接

目前我们的代码中的问题是 listener.incoming() 是一个阻塞的迭代器。 执行器无法在 listener 等待一个入站连接时,运行其它 futures, 这便导致了我们只有等之前的请求完成,才能处理新的连接。

我们可以通过将 listener.incoming() 从一个阻塞迭代器转变为一个非阻塞流。 流类似于迭代器,但它是可用于异步。 详情可回看Streams

让我们使用 async-std::net::TcpListener 替代 std::net::TcpListener, 并更新我们的连接处理函数,让它接受 async_std::net::TcpStream

use async_std::prelude::*;

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    //<-- snip -->
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

这个异步版本的 TcpListenerlistener.incoming() 实现了 Stream 特征, 这带来了两个好处。其一,listener.incoming() 不会再阻塞执行器了。 在入站的 TCP 连接无法取得进展时,它可以允许其它挂起的 futures 去执行。

第二个好处是,可以使用 Streamfor_each_concurrent 方法,来并发地处理来自 Stream 的元素。在这里,我们将利用这个方法来并发处理每个传入的请求。 我们需要从 futures 箱中导入 Stream 特征,现在 Cargo.toml 看起来是这样的:

+[dependencies]
+futures = "0.3"

 [dependencies.async-std]
 version = "1.6"
 features = ["attributes"]

现在,我们可以通过闭包函数传入 handle_connection 来并发处理每个连接。 闭包函数将获得每个 TcpStream 的所有权,并在新的 TcpStream 就绪时立即执行。 因为 handle_connection 不再是阻塞的,一个慢请求不会阻止其它请求的完成。

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            handle_connection(tcpstream).await;
        })
        .await;
}

并行处理请求

到目前为止,我们的示例在很大程度上,将并发(通过异步代码) 作为并行(使用线程)的替代方案。但是,异步代码和线程并非互斥。 在我们的示例中,for_each_concurrent 并发地在同一个进程中处理每个连接。 但 async-std 箱也允许我们去在特定的线程上生成任务。 因为 handle_connection 是可 Send 且是非阻塞的,所以它可以安全地使用 async_std::task::spawn。代码是这样的:

use async_std::task::spawn;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |stream| async move {
            let stream = stream.unwrap();
            spawn(handle_connection(stream));
        })
        .await;
}

现在我们可以同时使用并发和并行来同时处理多个连接!详情可查看 多线程执行器