4

我有一个数据源服务,它以观察者为参数。

void subscribe(Consumer onEventConsumer);

我想使用通量作为 RSocket 的响应流。我怎样才能做到这一点?正如我现在所看到的,它应该类似于

Flux<T> controllerMethod(RequestMessage mgs) {
   var flux = Flux.empty();
   dataSource.subscribe(event -> flux.push(event));
   return flux;
}

但是我很怀疑这是一个合适的解决方案,而且我是反应式方法的新手,我不知道我应该在这里使用什么方法?

4

2 回答 2

2

正如西蒙已经指出的那样,这就是您的用途Flux.create

查看projectreactor.io上的入门指南

在镜头中,您在create方法的 lambda 中注册了一个自定义侦听器:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});

您要做的是将传入的元素传递给FluxSink,然后它将这些元素发布到 Flux 上。

于 2021-01-24T10:18:46.670 回答
0

这是 Flux.create 的一个典型用例。您从 create lambda 内部注册一个观察者,它将接收到的数据向下传递到提供的 FluxSink

于 2020-05-09T09:04:00.330 回答