1

假设我有一个简单的 RSocket 和 Spring Boot Server。服务器将所有传入的客户端消息广播到所有连接的客户端(包括发送者)。客户端和服务器如下所示:

服务器:

  public RSocketController() {
    this.processor = DirectProcessor.<String>create().serialize();
    this.sink = this.processor.sink();
  }

  @MessageMapping("channel")
  Flux<String> channel(final Flux<String> messages) {
    this.registerProducer(messages);
    // breakpoint here
    return processor
        .doOnSubscribe(subscription -> logger.info("sub"))
        .doOnNext(message -> logger.info("[Sent] " + message));
  }

  private Disposable registerProducer(Flux<String> flux) {
    return flux
        .doOnNext(message -> logger.info("[Received] " + message))
        .map(String::toUpperCase)
        // .delayElements(Duration.ofSeconds(1))
        .subscribe(this.sink::next);
  }

客户:

  @ShellMethod("Connect to the server")
  public void connect(String name) {
    this.name = name;
    this.rsocketRequester = rsocketRequesterBuilder
        .rsocketStrategies(rsocketStrategies)
        .connectTcp("localhost", 7000)
        .block();
  }

  @ShellMethod("Establish a channel")
  public void channel() {
    this.rsocketRequester
        .route("channel")
        .data(this.fluxProcessor.doOnNext(message -> logger.info("[Sent] {}", message)))
        .retrieveFlux(String.class)
        .subscribe(message -> logger.info("[Received] {}", message));
  }

  @ShellMethod("Send a lower case message")
  public void send(String message) {
    this.fluxSink.next(message.toLowerCase());
  }

问题是:客户端发送的第一条消息由服务器处理,但没有再次到达发送者。所有后续消息都将毫无问题地传递。所有其他已连接的客户端将收到所有消息。

到目前为止我在调试时注意到的

  • 当我在客户端调用 channel() 时,会调用retrieveFlux() 和 subscribe()。但是在服务器上,断点并没有在相应的方法中触发。
  • 只有当客户端使用 send() 发送第一条消息时,才会在服务器上触发断点。
  • 在服务器上使用 .delayElements() 似乎可以“解决”问题。

我在这里做错了什么?为什么它首先需要 send() 来触发服务器断点?

提前致谢!

4

1 回答 1

1

ADirectProcessor没有缓冲区。如果它没有订阅者,则丢弃该消息。

(引用其Javadoc如果没有订阅者,则删除上游项目

我认为,当RSocketController.registerProducer()调用flux.[...].subscribe()它时,它会立即开始处理来自处理器的传入消息flux并将它们传递到处理器的接收器,但是对处理器的订阅还没有发生。因此消息被丢弃。

RSocketController.channel(...)我猜从方法返回后,对处理器的订阅是由框架完成的。-- 我认为你可以在你的processor.doOnSubscribe(..)方法中设置一个断点来查看它实际发生的位置。

因此,也许将registerProducer()调用转移到processor.doOnSubscribe()回调中会解决您的问题,如下所示:

  @MessageMapping("channel")
  Flux<String> channel(final Flux<String> messages) {
    return processor
        .doOnSubscribe(subscription -> this.registerProducer(messages))
        .doOnSubscribe(subscription -> logger.info("sub"))
        .doOnNext(message -> logger.info("[Sent] " + message));
  }

但我认为我个人更愿意将 a 替换DirectProcessorUnicastProcessor.create().onBackpressureBuffer().publish(). 因此,将向多个订阅者的广播转移到一个单独的操作中,以便在接收器和订阅者之间可以有一个缓冲区,并且可以以更好的方式处理延迟订阅者和背压。

于 2020-07-05T22:57:23.203 回答