10

我有一个关于 Springs RSocketRequester 的问题。我有一个 rsocket 服务器和客户端。客户端连接到此服务器并请求 @MessageMapping 端点。它按预期工作。

但是,如果我重新启动服务器会怎样。如何从客户端自动重新连接到 rsocket 服务器?谢谢

服务器:

@Controller
class RSC {

    @MessageMapping("pong")
    public Mono<String> pong(String m) {
        return Mono.just("PONG " + m);
    }
}

客户:

@Bean
    public RSocketRequester rSocketRequester() {
        return RSocketRequester
                .builder()
                .connectTcp("localhost", 7000)
                .block();

    }

@RestController
class RST {

    @Autowired
    private RSocketRequester requester;

    @GetMapping(path = "/ping")
    public Mono<String> ping(){
        return this.requester
                .route("pong")
                .data("TEST")
                .retrieveMono(String.class)
                .doOnNext(System.out::println);
    }
}
4

2 回答 2

12

针对 Spring Framework 5.2.6+ 进行了更新

你可以用io.rsocket.core.RSocketConnector#reconnect.

@Bean
Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rSocketRequesterBuilder) {
    return rSocketRequesterBuilder
            .rsocketConnector(connector -> connector
                    .reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
            .connectTcp("localhost", 7000);
}
@RestController
public class RST {
    @Autowired
    private Mono<RSocketRequester> rSocketRequesterMono;

    @GetMapping(path = "/ping")
    public Mono<String> ping() {
        return rSocketRequesterMono.flatMap(rSocketRequester ->
                rSocketRequester.route("pong")
                        .data("TEST")
                        .retrieveMono(String.class)
                        .doOnNext(System.out::println));
    }
}
于 2019-11-16T12:18:35.393 回答
6

我认为我不会RSocketRequester在应用程序中创建 bean。与WebClient(具有可重用连接池)不同,RSocket 请求者包装了一个 RSocket,即一个网络连接。

我认为最好存储一个Mono<RSocketRequester>并订阅它,以便在需要时获得一个实际的请求者。因为您不想为每个调用创建一个新连接,所以您可以缓存结果。感谢Mono retryXYZ运营商,您可以通过多种方式改进重新连接行为。

您可以尝试以下方法:

@Service
public class RSocketPingService {

    private final Mono<RSocketRequester> requesterMono;

    // Spring Boot is creating an auto-configured RSocketRequester.Builder bean
    public RSocketPingService(RSocketRequester.Builder builder) {
        this.requesterMono = builder
                .dataMimeType(MediaType.APPLICATION_CBOR)
                .connectTcp("localhost", 7000).retry(5).cache();
    }

    public Mono<String> ping() {
        return this.requesterMono.flatMap(requester -> requester.route("pong")
                .data("TEST")
                .retrieveMono(String.class));
    }


}
于 2019-11-14T19:36:46.557 回答