2

我一直在从(https://dzone.com/articles/rsocket-with-spring-boot-amp-js-zero-to-hero)运行一个示例 RSocket 项目,我试图存储每个客户端 ID 的请求者对象。

Rsocket JS客户端:

  client = new RSocketClient({
    serializers: {
      data: JsonSerializer,
      metadata: IdentitySerializer
    },
    setup: {
      payload: {
        data: "121212",
        metadata: String.fromCharCode("client-id".length) + "client-id"
      },
      keepAlive: 60000,
      lifetime: 180000,
      dataMimeType: "application/json",
      metadataMimeType: "message/x.rsocket.routing.v0"
    },
    transport: new RSocketWebSocketClient({
      url: 'ws://localhost:8080/tweetsocket'
    }),
  });

  // Open the connection
  client.connect().subscribe({
    onComplete: socket => {
      // socket provides the rsocket interactions fire/forget, request/response,
      // request/stream, etc as well as methods to close the socket.
      socket.requestStream({
        data: {
          'author': document.getElementById("author-filter").value
        },
        metadata: String.fromCharCode('tweets.by.author'.length) + 'tweets.by.author',
      }).subscribe({
        onComplete: () => console.log('complete'),
        onError: error => {
          console.log(error);
          addErrorMessage("Connection has been closed due to ", error);
        },
        onNext: payload => {
          console.log(payload.data);
          reloadMessages(payload.data);
        },
        onSubscribe: subscription => {
          subscription.request(2147483647);
        },
      });
    },
    onError: error => {
      console.log(error);
      addErrorMessage("Connection has been refused due to ", error);
    },
    onSubscribe: cancel => {
      /* call cancel() to abort */
    }
  });
}

Rsocket Java 服务器:

@ConnectMapping(value = "client-id")
    public void onConnect(RSocketRequester rSocketRequester, @Payload String clientId) {
        logger.info(clientId);
        rSocketRequester.rsocket()
                .onClose() // (1)
//                .doFirst(() -> logger.info("Client Connected "))
//                .doOnError( error -> logger.info("Channel to client {} CLOSED", error)) // (3)
                .subscribe(null, null, () -> logger.info("herererer"));
    }

    @MessageMapping("tweets.by.author")
    public Flux<Tweet> getByAuthor(TweetRequest request) {
        return service.getByAuthor(request.getAuthor());
    }
    

我一直在调试模式下运行应用程序,无法触发onConnect ConnectMapping 注释方法。请帮助了解我在这里缺少什么。

4

2 回答 2

0

这里的问题可能是几件事:

  • 定义路由时最好使用复合元数据
  • 使用复合元数据时,使用元数据很IdentitySerializer重要

IdentitySerializer将按原样传递元数据的值,而不进行任何修改。这很重要,因为 for 的值metadata已经被编码,因为它应该来自encodeCompositeMetadata(...)

import {
    BufferEncoders,
    JsonSerializer,
    RSocketClient,
    APPLICATION_JSON,
    MESSAGE_RSOCKET_COMPOSITE_METADATA,
    encodeRoute, MESSAGE_RSOCKET_ROUTING, encodeCompositeMetadata, IdentitySerializer
} from "rsocket-core";
import RSocketWebSocketClient from "rsocket-websocket-client";

const client = new RSocketClient({
    serializers: {
        data: JsonSerializer,
        metadata: IdentitySerializer
    },
    setup: {
        payload: {
            data: "121212",
            metadata: encodeCompositeMetadata([
                [MESSAGE_RSOCKET_ROUTING, encodeRoute("client-id")],
            ])
        },
        keepAlive: 60000,
        lifetime: 180000,
        dataMimeType: APPLICATION_JSON.string,
        metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
    },
    transport: new RSocketWebSocketClient({
        url: 'ws://localhost:8080/tweetsocket'
    }, BufferEncoders),
});
于 2021-12-08T05:58:46.280 回答
-1
@ConnectMapping
void onConnect(RSocketRequester requester) {
    Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null")
            .onClose()
            .doOnError(error -> log.warn(requester.rsocketClient() + " Closed"))
            .doFinally(consumer -> log.info(requester.rsocketClient() + " Disconnected"))
            .subscribe();
}
于 2021-11-21T19:47:49.733 回答