我是 Rust 的新手,正在编写一个简单的应用程序,该应用程序将使用Tonic通过 gRPC 流式传输一些值。这些值最初是从外部库作为BoxStream ( Pin<Box<Stream>>
) 获取的,并且 tonic 的 API 需要实现的东西Stream
(当然 Pin 不需要)。
Tonic 的流式传输示例使用ReceiverStream将 mpsc 通道转换为流,并分离一个线程以将值推送到其中。这将需要一个流的生命周期,'static
这对于我的实际实现来说不是一个选项,因为我的流的生命周期与返回它的类相关联。
提供实现 Stream 的东西的最佳方式是什么,我可以从我Pin<Box<Stream>>
的
src/main.rs(这不会编译,因为 BoxStream<'static, Entry> 没有实现 IntoStreamingRequest)
use futures::prelude::stream::BoxStream;
use async_stream::stream;
use tonic::{IntoStreamingRequest};
struct Entry {
key: String,
}
fn main() {
// Create Request
let stream: BoxStream<'static, Entry> = api_function();
let request = stream.into_streaming_request();
// Send request
//let mut client = DataImporterClient::connect("http://[::1]:50051").await.unwrap();
//let response = client.grpc_function(request).await?;
}
fn api_function() -> BoxStream<'static, Entry> {
Box::pin(stream! {
let entries = vec!(
Entry {key: String::from("value1")},
Entry {key: String::from("value2")}
);
for entry in entries {
yield entry;
}
})
}
货运.toml
[package]
name = "tonic-streaming-minimum-example"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tonic = "0.5"
futures = "0.3"
tokio-stream = "0.1"
async-stream = "0.3"
提供编译错误:
error[E0599]: the method `into_streaming_request` exists for struct `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>`, but its trait bounds were not satisfied
--> src\main.rs:12:26
|
12 | let request = stream.into_streaming_request();
| ^^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>` due to unsatisfied trait bounds
|
::: C:\Users\tmathews\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
|
408 | pub struct Pin<P> {
| -----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sync`
|
::: C:\Users\tmathews\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.17\src\stream.rs:27:1
|
27 | pub trait Stream {
| ----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sized`
| doesn't satisfy `_: Sync`
|
= note: the following trait bounds were not satisfied:
`Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: futures::Stream`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: std::marker::Send`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: Sync`
which is required by `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sized`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: futures::Stream`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: std::marker::Send`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): futures::Stream`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): Sync`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`