3

假设我有这个用于聊天消息的简单 Websocket 处理程序:

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    webSocketSession
            .receive()
            .map(webSocketMessage -> webSocketMessage.getPayloadAsText())
            .map(textMessage -> textMessageToGreeting(textMessage))
            .doOnNext(greeting-> greetingPublisher.push(greeting))
            .subscribe();
    final Flux<WebSocketMessage> message = publisher
            .map(greet -> processGreeting(webSocketSession, greet));
    return webSocketSession.send(message);
}

一般来说,这里需要做什么,因为它将使用rsocket协议?

4

1 回答 1

2

Spring WebFlux 中的 RSocket 控制器看起来更像 RestController 而不是 WebSocketHandler。所以上面的例子很简单:

@Controller
public class RSocketController {

    @MessageMapping("say.hello")
    public Mono<String> saHello(String name) {
        return Mono.just("server says hello " + name);
    }
}

这相当于requestResponse方法。

如果这个答案不满足您,请描述更多您想要实现的目标。

编辑

如果你想向所有客户端广播消息,他们需要订阅同一个 Flux。

public class GreetingPublisher {

    final FluxProcessor processor;
    final FluxSink sink;

    public GreetingPublisher() {
        this.processor = DirectProcessor.<String>create().serialize();
        this.sink = processor.sink();
    }

    public void addGreetings(String greeting) {
        this.sink.next(greeting);
    }

    public Flux<String> greetings() {
        return processor;
    }
}

@Controller
public class GreetingController{

    final GreetingPublisher greetingPublisher = new GreetingPublisher();

    @MessageMapping("greetings.add")
    public void addGreetings(String name) {
        greetingPublisher.addGreetings("Hello, " + name);
    }

    @MessageMapping("greetings")
    public Flux<String> sayHello() {
        return greetingPublisher.greetings();
    }
}

您的客户必须使用该方法调用greetings端点。requestStream无论您将消息发送到何处,greetingPublisher.addGreetings()它都会广播给所有客户端。

于 2020-04-27T19:12:20.703 回答