我正在尝试使用async_std从网络接收UDP数据报.
I'm trying to use async_std to receive UDP datagrams from the network.
有一个UdpSocket实现了 async recv_from ,此方法返回将来,但我需要一个async_std::stream::Stream来提供UDP数据报流,因为它是更好的抽象.
There is a UdpSocket that implements async recv_from, this method returns a future but I need a async_std::stream::Stream that gives a stream of UDP datagrams because it is a better abstraction.
我找到了 tokio::net::UdpFramed 完全可以满足我的需要,但是在当前版本的tokio中不可用.
I've found tokio::net::UdpFramed that does exactly what I need but it is not available in current versions of tokio.
一般来说,问题是如何将Future从给定的异步函数转换为Stream?
Generally speaking the question is how do I convert Futures from a given async function into a Stream?
推荐答案对于单个项目,请使用 FutureExt::into_stream :
For a single item, use FutureExt::into_stream:
use futures::prelude::*; // 0.3.1 fn outer() -> impl Stream<Item = i32> { inner().into_stream() } async fn inner() -> i32 { 42 }对于关闭产生的大量期货的流,请使用 stream::unfold :
For a stream from a number of futures generated by a closure, use stream::unfold:
use futures::prelude::*; // 0.3.1 fn outer() -> impl Stream<Item = i32> { stream::unfold((), |()| async { Some((inner().await, ())) }) } async fn inner() -> i32 { 42 }
根据您的情况,您可以使用stream::unfold:
use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"] use futures::prelude::*; // 0.3.1 fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> { stream::unfold(s, |s| { async { let data = read_one(&s).await; Some((data, s)) } }) } async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> { let mut data = vec![0; 1024]; let (len, _) = s.recv_from(&mut data).await?; data.truncate(len); Ok(data) } #[async_std::main] async fn main() -> io::Result<()> { let s = UdpSocket::bind("0.0.0.0:9876").await?; read_many(s) .for_each(|d| { async { match d { Ok(d) => match std::str::from_utf8(&d) { Ok(s) => println!("{}", s), Err(_) => println!("{:x?}", d), }, Err(e) => eprintln!("Error: {}", e), } } }) .await; Ok(()) }更多推荐
如何将Future转换为Stream?
发布评论