迭代和并发
与同步中的 Iterator
s 类似,对 Stream
中的值进行迭代与处理的方法有多种。
有组合器风格的方法如 map
、filter
和 fold
,以及在它们错误时退出的变种
try_map
、try_filter
和 try_fold
。
不幸的是,Stream
s 不能使用 for
循环,而只能使用命令式风格的代码,像
while let
和 next
/try_next
函数:
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // for `next`
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // for `try_next`
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
但是,如果我们每次只处理一个元素,这样就潜在地留下了产生并发的机会,
毕竟这也就是我们首先编写异步代码的原因。在一个 stream 并发中,
可以使用 for_each_concurrent
和 try_for_each_concurrent
函数来处理多个项目:
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}