假设我有一个简单的 RSocket 和 Spring Boot Server。服务器将所有传入的客户端消息广播到所有连接的客户端(包括发送者)。客户端和服务器如下所示:
服务器:
public RSocketController() {
this.processor = DirectProcessor.<String>create().serialize();
this.sink = this.processor.sink();
}
@MessageMapping("channel")
Flux<String> channel(final Flux<String> messages) {
this.registerProducer(messages);
// breakpoint here
return processor
.doOnSubscribe(subscription -> logger.info("sub"))
.doOnNext(message -> logger.info("[Sent] " + message));
}
private Disposable registerProducer(Flux<String> flux) {
return flux
.doOnNext(message -> logger.info("[Received] " + message))
.map(String::toUpperCase)
// .delayElements(Duration.ofSeconds(1))
.subscribe(this.sink::next);
}
客户:
@ShellMethod("Connect to the server")
public void connect(String name) {
this.name = name;
this.rsocketRequester = rsocketRequesterBuilder
.rsocketStrategies(rsocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
@ShellMethod("Establish a channel")
public void channel() {
this.rsocketRequester
.route("channel")
.data(this.fluxProcessor.doOnNext(message -> logger.info("[Sent] {}", message)))
.retrieveFlux(String.class)
.subscribe(message -> logger.info("[Received] {}", message));
}
@ShellMethod("Send a lower case message")
public void send(String message) {
this.fluxSink.next(message.toLowerCase());
}
问题是:客户端发送的第一条消息由服务器处理,但没有再次到达发送者。所有后续消息都将毫无问题地传递。所有其他已连接的客户端将收到所有消息。
到目前为止我在调试时注意到的
- 当我在客户端调用 channel() 时,会调用retrieveFlux() 和 subscribe()。但是在服务器上,断点并没有在相应的方法中触发。
- 只有当客户端使用 send() 发送第一条消息时,才会在服务器上触发断点。
- 在服务器上使用 .delayElements() 似乎可以“解决”问题。
我在这里做错了什么?为什么它首先需要 send() 来触发服务器断点?
提前致谢!