0

我需要实现一个Stream在滑动窗口中产生项目的自定义(即 [1, 2, 3] => [(1, 2), (2, 3)])。所以我实现并给了它一个名为.tuple_windows(). 允许以下代码

let iter = stream::iter(0..=3);
assert_eq!(
    iter.tuple_windows().collect::<Vec<_>>().await,
    vec![(0, 1), (1, 2), (2, 3)]
)

当最终类型没有实现特征时,我遇到了一个奇怪的情况Stream

代码(操场):

use anyhow; // 1.0.52
use futures; // 0.3.19
use futures::{stream, Stream, StreamExt};
use pin_project_lite;
use pin_project_lite::pin_project;
use std::{
    pin::Pin,
    task::{Context, Poll},
};

use tokio; // 1.15.0 // 0.2.8
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut stream = stream::iter(0..20)
        .map(|_| stream::iter(2..10)) // this works with the custom Stream
        //      .map(|_| stream::iter(2..10).enumerate())   // but this doesn't
        .enumerate(); // this works regardless what happens in `map`
                      //     .tuple_windows(); // this only works with the first map

    while let Some(_) = stream.next().await {}
    Ok(())
}

impl<T: Stream> TupleWindowsExt for T {}
pub trait TupleWindowsExt: Stream + Sized {
    fn tuple_windows(self) -> TupleWindows<Self> {
        TupleWindows::new(self)
    }
}

pin_project! {
    #[derive(Debug)]
    pub struct TupleWindows<S: Stream> {
        #[pin]
        stream: S,
        previous: Option<S::Item>,
    }
}

impl<S: Stream> TupleWindows<S> {
    pub fn new(stream: S) -> Self {
        Self {
            stream,
            previous: None,
        }
    }
}

impl<S: Stream> Stream for TupleWindows<S>
where
    S::Item: Clone,
{
    type Item = (S::Item, S::Item);

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
            Some(next) => next,
            None => return Poll::Ready(None),
        };

        if let Some(previous) = this.previous {
            let res = (previous.clone(), current.clone());
            *this.previous = Some(current);
            Poll::Ready(Some(res))
        } else {
            let next = match this.stream.poll_next(cx) {
                Poll::Ready(next) => next,
                Poll::Pending => {
                    *this.previous = Some(current);
                    return Poll::Pending;
                }
            };
            *this.previous = next.clone();
            Poll::Ready(next.map(|next| (current, next)))
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        let (lower, upper) = self.stream.size_hint();
        (
            lower.saturating_mul(2),
            upper.and_then(|upper| upper.checked_mul(2)),
        )
    }
}

编译器错误也没有帮助,因为它只告诉我Stream没有为新创建的类型实现:

error[E0599]: the method `next` exists for struct `TupleWindows<futures::stream::Map<futures::stream::Iter<std::ops::Range<{integer}>>, [closure@src/main.rs:16:11: 16:46]>>`, but its trait bounds were not satisfied

我在这里想念什么?

4

2 回答 2

2

您的Stream实现要求内部流中的项目是可克隆的:

impl<S: Stream> Stream for TupleWindows<S>
where
    S::Item: Clone,

在工作案例中stream::iter(0..20).map(|_| stream::iter(2..10)).tuple_windows(),您将futures::stream::Iter<std::ops::Range<i32>>项目流传递给tuple_windows(). 在内部迭代器类型实现时Iter实现。这里的内部迭代器类型是,它确实实现了。CloneClonestd::ops::Range<i32>Clone

当您更改代码以enumerate()在. 根本没有实现(从期货 0.3.19 开始)。map()futures::stream::Enumerate<futures::stream::Iter<std::ops::Range<i32>>>tuple_windows()EnumerateClone

我看不出任何Enumerate无法实现的原因Clone(使用适当的特征界限);我想它没有被实施,因为没有人要求它。

于 2022-01-11T03:25:17.127 回答
0

明白了,Streamimpl 不起作用,因为这些项目不满足Clone我的自定义要求Stream

我应该把Clone边界放在适配器和impl块上

impl<T: ?Sized> TupleWindowsExt for T where T: Stream {}
pub trait TupleWindowsExt: Stream {
    fn tuple_windows(self) -> TupleWindows<Self>
    where
        Self: Sized,
        Self::Item: Clone,
    {
        TupleWindows::new(self)
    }
}

于 2022-01-11T03:23:01.490 回答