1

我有一个 spring boot rsocket 实现,如果客户端取消或关闭他们的 rsocket 请求,那么我想取消服务器上的其他订阅注册。

在 Spring Boot 服务器上的日志中,我可以看到发送或接收了一条取消消息:

WARN i.r.t.n.s.WebsocketServerTransport$1 [reactor-http-nio-3] received WebSocket Close Frame - connection is closing
INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()

如何捕获和处理此取消信号?

我尝试取消端点,但这些没有捕获信号:

@MessageMapping("cancel")
Flux<Object> onCancel() {
    log.info("Captured cancel signal");
}

或者

@ConnectMapping("cancel")
Flux<Object> onCancel2() {
    log.info("Captured cancel2 signal");
}

这个关于取消订阅的问题可能是相关的,这个关于检测 websocket 断开连接的问题

4

2 回答 2

1

要捕获取消信号,您可以使用订阅onClose()事件。

在您的控制器中

@Controller
class RSocketConnectionController {

    @ConnectMapping("client-id")
    fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
//        rSocketRequester.rsocket().dispose()   //to reject connection
        rSocketRequester
                .rsocket()
                .onClose()
                .subscribe(null, null, {
                    log.info("{} just disconnected", clientId)

                    //TODO here whatever you want
                })
    }
}

您的客户端需要正确发送 SETUP 帧才能调用 this @ConnectMapping。如果您使用rsocket-js,您需要添加这样的有效负载:

const client = new RSocketClient({
        // send/receive JSON objects instead of strings/buffers
        serializers: {
          data: JsonSerializer,
          metadata: IdentitySerializer
        },
        setup: {
          //for connection mapping on server
          payload: {
            data: 'unique-client-id',   //TODO you can receive this data on server side
            metadata: String.fromCharCode("client-id".length) + "client-id"
          },
          // ms btw sending keepalive to server
          keepAlive: 60000,
.....
        }
});

于 2020-04-30T12:11:18.813 回答
1

这不是一个很好的问题。答案是

INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()

FluxSink从原始@MessageMapping端点设置的 a 可以看到。

例如:

@MessageMapping("hello")
Flux<Object> hello(@Payload String message) {       
    return myService.generateWorld(message);
}

myService课堂上

public Flux<Object> generateWorld(String hello) {
    EmitterProcessor<Object> emitter = EmitterProcessor.create();
    FluxSink<Object> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);

    // doing stuff with sink here
    sink.next(stuff());

    // This part will handle a cancel from the client
    sink.onCancel(() -> {log.info("********** SINK.onCancel ***********");});

    return Flux.from(emitter));  
}

sink.onCancel()处理从客户端到端点的通量取消hello

于 2020-04-30T17:45:27.277 回答