1

在 warp (rust) 中,我如何将 a 传递std::sync::mpsc::Sender<T>给我的业务处理程序?以下代码抱怨std::sync::mpsc::Sender<T>不是Sync

我知道我可以my_sender用类似的东西来包装Arc<RwLock<Sender<T>>>,但这似乎不对,因为通道的目的是启用线程间通信。

我必须包装Sender<T>,还是有更好的方法来做到这一点?

use std;
use warp::{self, Filter};

fn with_sender<T: Send + Sync>(
    sender: std::sync::mpsc::Sender<T>,
) -> impl Filter<Extract = (std::sync::mpsc::Sender<T>,), Error = std::convert::Infallible> + Clone
{
    warp::any().map(move || sender.clone())
}

// later in main():
let (my_sender, my_receiver) = std::sync::mpsc::channel::<MySyncAndSendType>();

let routes = (warp::get()
    .and(warp::path!("some" / "path"))
    .map(my_handlers::handler_1)
    .map(turn_result_to_json)
    .or(warp::post()
        .and(warp::path!("some" / "other" / "path"))
        .and(warp::multipart::form().max_length(10 * 1024 * 1024))
        .and(with_sender(my_sender.clone()))
        // -------------------------------------------------------------------------
        // -------> I need my_sender in the function my_handlers::handler_2 <-------
        .and_then(|form, my_sender| async move {
            Ok::<_, Infallible>(my_handlers::handler_2(form, my_sender).await)
        })
        .map(turn_result_to_json)));

warp::serve(routes).run(([127, 0, 0, 1], 1234)).await;
4

1 回答 1

1

Senderis not Sync,所以它不能被共享,但是它is Send,所以你可以 move把它放到异步任务中。

您的代码的问题是闭包通过引用捕获其环境,即使async里面的块是move. 您也应该只需要关闭move

.and_then(move |form, my_sender| async move {
    Ok::<_, Infallible>(my_handlers::handler_2(form, my_sender).await)
})
于 2020-08-16T03:23:45.757 回答