1

我有一个看起来像这样的代码

use itertools::Itertools;

let (tx, rx) = std::sync::mpsc::channel();

tokio::spawn(async move {
    for (v1, v2) in rx.into_iter().tuple_windows() {
        // do some computation
    }
}

for v in (0..) {
    tx.send(v).unwrap();
}

当我将频道更改为 atokio::mpsc::channel()时,它变成了一个没有适配器rx的异步流(即)futures::Stream.tuple_windows()

你知道提供与 s 类似功能的 crateItertoolsStream?如果没有,您如何建议这样做?

4

2 回答 2

1

有一个StreamExt来自期货。那里没有 Windows 功能,但您可以使用它来实现您自己的扩展。

像:_

use async_trait::async_trait;
use futures::stream::StreamExt;
use std::pin::Pin;

#[async_trait]
trait TuplesWindowsExt: StreamExt + Unpin {
    async fn tuples(
        self: &mut Pin<Box<Self>>,
    ) -> (
        Option<<Self as futures::Stream>::Item>,
        Option<<Self as futures::Stream>::Item>,
    )
    where
        <Self as futures::Stream>::Item: Send,
    {
        let a = self.next().await;
        let b = self.next().await;
        (a, b)
    }
}

操场

于 2022-01-06T08:33:41.290 回答
0

在这里,我最终遵循@Netwave 的答案并实现了我自己的扩展,实际实现有点不同,以便产生一个滑动窗口(如.tuple_windows()for Itertools):[0, 1, 2, 3]->[(0, 1), (1, 2), (2, 3)]

这不是微不足道的,所以这里适合任何可能需要它的人

impl<T: Stream> TupleWindowsExt for T {}
trait TupleWindowsExt: Stream {
    fn tuple_windows(self) -> TupleWindows<Self>
    where
        Self: Sized,
    {
        TupleWindows::new(self)
    }
}

pin_project! {
    #[derive(Debug)]
    struct TupleWindows<S: Stream> {
        #[pin]
        stream: S,
        previous: Option<S::Item>,
    }
}

impl<S: Stream> TupleWindows<S> {
    pub fn new(stream: S) -> Self {
        Self {
            stream,
            previous: None,
        }
    }
}

impl<S: Stream> Stream for TupleWindows<S>
where
    S::Item: Clone,
{
    type Item = (S::Item, S::Item);

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
            Some(next) => next,
            None => return Poll::Ready(None),
        };

        if let Some(previous) = this.previous {
            let res = (previous.clone(), current.clone());
            *this.previous = Some(current);
            Poll::Ready(Some(res))
        } else {
            let next = match this.stream.poll_next(cx) {
                Poll::Ready(next) => next,
                Poll::Pending => {
                    *this.previous = Some(current);
                    return Poll::Pending;
                }
            };
            *this.previous = next.clone();
            Poll::Ready(next.map(|next| (current, next)))
        }
    }
}


于 2022-01-07T15:33:04.217 回答