1

客户端代码:

   clientRSocket = RSocketConnector.create()
                    .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
                    .payloadDecoder(PayloadDecoder.ZERO_COPY)
                    //.reconnect(Retry.backoff(50, Duration.ofMillis(500)))
                    .dataMimeType(WellKnownMimeType.APPLICATION_JSON.toString())
                    .resume(new Resume().sessionDuration(getLocalConfig().getDuration("resumeDurationInMinutes"))
                            .retry(Retry.fixedDelay(Long.MAX_VALUE, getLocalConfig().getDuration("fixedDelayInSeconds")))
                    .streamTimeout(getLocalConfig().getDuration("resumeTimeOutInSeconds")))
                    .connect(TcpClientTransport.create(getLocalConfig().getInt("port")))
                    .doOnError(exception -> getLogger().error(DaeLogKeys.rsocket.failed.error, exception))
                    .block();

客户端流请求示例:

rsocket.requestStream(DefaultPayload.create(ByteBufAllocator.DEFAULT.buffer(),
                socket.route(List.of("part-type.getAll"))))
                .map(Payload::getDataUtf8)
                .map(doc->socket.decode(doc,DaeApiPartType.class))
                .collect(Collectors.toSet()).block())

服务器响应示例:

@MessageMapping("part-type.getAll")
    public Flux<DaeApiPartType> getAllPartTypes() {
        return controlPartType.findAll();
    }

似乎一切都很顺利。我看到客户端发布有效负载,但没有任何回复。没有错误。客户端卡住并等待答复。

在 1.0.1 这个例子有效。请指教。

4

0 回答 0