我需要实现一个Stream
在滑动窗口中产生项目的自定义(即 [1, 2, 3] => [(1, 2), (2, 3)])。所以我实现并给了它一个名为.tuple_windows()
. 允许以下代码
let iter = stream::iter(0..=3);
assert_eq!(
iter.tuple_windows().collect::<Vec<_>>().await,
vec![(0, 1), (1, 2), (2, 3)]
)
当最终类型没有实现特征时,我遇到了一个奇怪的情况Stream
。
代码(操场):
use anyhow; // 1.0.52
use futures; // 0.3.19
use futures::{stream, Stream, StreamExt};
use pin_project_lite;
use pin_project_lite::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio; // 1.15.0 // 0.2.8
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut stream = stream::iter(0..20)
.map(|_| stream::iter(2..10)) // this works with the custom Stream
// .map(|_| stream::iter(2..10).enumerate()) // but this doesn't
.enumerate(); // this works regardless what happens in `map`
// .tuple_windows(); // this only works with the first map
while let Some(_) = stream.next().await {}
Ok(())
}
impl<T: Stream> TupleWindowsExt for T {}
pub trait TupleWindowsExt: Stream + Sized {
fn tuple_windows(self) -> TupleWindows<Self> {
TupleWindows::new(self)
}
}
pin_project! {
#[derive(Debug)]
pub struct TupleWindows<S: Stream> {
#[pin]
stream: S,
previous: Option<S::Item>,
}
}
impl<S: Stream> TupleWindows<S> {
pub fn new(stream: S) -> Self {
Self {
stream,
previous: None,
}
}
}
impl<S: Stream> Stream for TupleWindows<S>
where
S::Item: Clone,
{
type Item = (S::Item, S::Item);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(next) => next,
None => return Poll::Ready(None),
};
if let Some(previous) = this.previous {
let res = (previous.clone(), current.clone());
*this.previous = Some(current);
Poll::Ready(Some(res))
} else {
let next = match this.stream.poll_next(cx) {
Poll::Ready(next) => next,
Poll::Pending => {
*this.previous = Some(current);
return Poll::Pending;
}
};
*this.previous = next.clone();
Poll::Ready(next.map(|next| (current, next)))
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, upper) = self.stream.size_hint();
(
lower.saturating_mul(2),
upper.and_then(|upper| upper.checked_mul(2)),
)
}
}
编译器错误也没有帮助,因为它只告诉我Stream
没有为新创建的类型实现:
error[E0599]: the method `next` exists for struct `TupleWindows<futures::stream::Map<futures::stream::Iter<std::ops::Range<{integer}>>, [closure@src/main.rs:16:11: 16:46]>>`, but its trait bounds were not satisfied
我在这里想念什么?