1

我尝试创建一个简单的 Spring Boot 应用程序来测试 Rsocket 的 Spring Integrtaion。

我将测试文件夹中的示例代码复制spring-integrarion-rsocket到我的 Spring Boot 应用程序中。

pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-rsocket</artifactId>
        </dependency>

并且application.properties

spring.rsocket.server.transport=tcp
server.port=7000

主要应用程序类。

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@Configuration
class DemoIntegrationConfig {

    @Bean
    public ServerRSocketConnector serverRSocketConnector() {
        return new ServerRSocketConnector("localhost", 0);
    }

    @Bean
    public ClientRSocketConnector clientRSocketConnector(ServerRSocketConnector serverRSocketConnector) {
        int port = serverRSocketConnector.getBoundPort().block();
        ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", port);
        clientRSocketConnector.setAutoStartup(false);
        return clientRSocketConnector;
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
        return IntegrationFlows
                .from(Function.class)
                .handle(RSockets.outboundGateway("/uppercase")
                        .command((message) -> RSocketOutboundGateway.Command.requestStreamOrChannel)
                        .expectedResponseType("T(java.lang.String)")
                        .clientRSocketConnector(clientRSocketConnector))
                .get();
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow() {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase"))
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

和测试代码。

@SpringBootTest
class DemoApplicationTests {

    @Autowired
    @Qualifier("rsocketUpperCaseRequestFlow.gateway")
    private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;

    @Test
    void testRsocketUpperCaseFlows() {
        Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a\n", "b\n", "c\n"));

        StepVerifier.create(result)
                .expectNext("A", "B", "C")
                .verifyComplete();
    }
}

当我运行测试时,它抛出了异常:没有目标'/大写'的处理程序。

2019-10-30 12:59:28.617  INFO 2800 --- [           main] com.example.demo.DemoApplicationTests    : Started DemoApplicationTests in 4.778 seconds (JVM running for 6.408)
org.springframework.messaging.MessageDeliveryException: No handler for destination '/uppercase'
        at org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:309)
        at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:445)
        at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:417)
        at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$handleAndReply$4(MessagingRSocket.java:173)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4087)
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
        at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
        at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstInner.onNext(FluxSwitchOnFirst.java:180)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:240)
        at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
        at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:427)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
        at io.rsocket.internal.RateLimitableRequestPublisher.subscribe(RateLimitableRequestPublisher.java:74)
        at io.rsocket.RSocketResponder.handleStream(RSocketResponder.java:446)
        at io.rsocket.RSocketResponder.handleChannel(RSocketResponder.java:502)
        at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:315)
        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
        at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
        at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
        at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
        at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
        at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116)
        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
        at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
        at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
        at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:329)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 6.961 s <<< FAILURE! - in com.example.demo.DemoApplicationTests
[ERROR] testRsocketUpperCaseFlows  Time elapsed: 0.894 s  <<< FAILURE!

源代码托管在我的 Github上。

顺便提一句。我必须将此应用程序作为嵌入式 RSocket Web Flux应用程序运行(或将其作为独立 RSocket 应用程序运行),如果我使用IntegrationConfig中配置的相同端口运行 RSocket 服务器,则启动应用程序时会出现端口绑定错误,看来ServerRSocketConnector会创建自己的RSocket Server。

更新:2020 年 2 月 27 日

我试图删除依赖项中的spring-boot-starter-rsocket并创建一个RestController与网关通信。

pom.xml 的更新依赖。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-rsocket</artifactId>
        </dependency>

并清理application.properties中的配置。

样品RestController


@RestController
class HelloController {

    @Autowired()
    @Lazy
    @Qualifier("rsocketUpperCaseRequestFlow.gateway")
    private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
    }
}

当我运行应用程序时,我遇到了这样的异常。

org.springframework.messaging.MessageDeliveryException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]
    at org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:389)
    at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:476)
    at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:444)
    at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$handleAndReply$4(MessagingRSocket.java:173)
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
    at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
    at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstInner.onNext(FluxSwitchOnFirst.java:180)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
    at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:240)
    at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
    at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:427)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
    at io.rsocket.internal.RateLimitableRequestPublisher.subscribe(RateLimitableRequestPublisher.java:74)
    at io.rsocket.RSocketResponder.handleStream(RSocketResponder.java:446)
    at io.rsocket.RSocketResponder.handleChannel(RSocketResponder.java:502)
    at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:315)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637)
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
    at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)


java.lang.AssertionError: expectation "expectNext(A)" failed (expected: onNext(A); actual: onError(io.rsocket.exceptions.ApplicationErrorException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]))

    at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
    at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
    at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
    at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
    at reactor.test.DefaultStepVerifierBuilder.lambda$addExpectedValue$10(DefaultStepVerifierBuilder.java:501)
    at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2211)
    at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1483)
    at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1431)
    at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1091)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onError(FluxDoFinally.java:129)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
    at reactor.core.publisher.UnicastProcessor.checkTerminated(UnicastProcessor.java:334)
    at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:232)
    at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
    at reactor.core.publisher.UnicastProcessor.onError(UnicastProcessor.java:401)
    at io.rsocket.RSocketRequester.handleFrame(RSocketRequester.java:556)
    at io.rsocket.RSocketRequester.handleIncomingFrames(RSocketRequester.java:516)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:308)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:422)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: io.rsocket.exceptions.ApplicationErrorException: Destination '/uppercase' does not support REQUEST_CHANNEL. Supported interaction(s): [SETUP, METADATA_PUSH]
        at io.rsocket.exceptions.Exceptions.from(Exceptions.java:45)
        ... 37 more

4

1 回答 1

1

我刚刚使用您的所有代码从https://start.spring.io创建了一个新项目,它对我来说效果很好。我使用 Spring Boot 2.2.0

虽然我看到 RSocket 自动配置和我们拥有的ServerRSocketConnector.

我们需要ServerRSocketConnector基于我们在 Spring Boot 中为 RSocket 支持自动配置的任何内容以某种方式进行制作。随意在 Spring Boot 中就此事提出问题!然后我们将尝试弄清楚必须在那里和/或在 Spring Integration 中实现什么以及如何实现。

于 2019-10-29T15:36:29.333 回答