0

我想创建一个 rsocket 通道,其中从服务器发送的数据可以是对客户端请求的反应或推送。我为此使用了通量合并。

它是参考数据:客户端可以要求刷新,服务器也可以推送更新。

所以我在服务器端有这个:

    @MessageMapping("update-stream")
    Flux<DomainObject> addUpdatesListener(Flux<RefreshRequest> requests) {
        Flux<DomainObject> pushFlux = Flux.from(this.flux)
            .doOnError((e) -> log.error("Error on push flux : {}", e, e));
        return requests
                .map(this::getUpdates)
                .flatMap(Flux::fromIterable)
                .doOnError((e) -> log.error("Error on channel flux : {}", e, e))
                .mergeWith(pushFlux)
                .doOnError((e) -> log.error("Error on merged flux : {}", e, e));
    }

它的工作原理是当我停止客户端时出现以下错误:

06-07-2020 15:58:53.168 [reactor-http-nio-3] ERROR reactor.core.publisher.Operators.error - Operator called default onErrorDropped
java.util.concurrent.CancellationException: Disposed
    at reactor.core.publisher.FluxProcessor.dispose(FluxProcessor.java:80)
    at io.rsocket.core.RSocketResponder$3.hookOnCancel(RSocketResponder.java:513)
    at reactor.core.publisher.BaseSubscriber.cancel(BaseSubscriber.java:230)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at io.rsocket.core.RSocketResponder.cleanUpSendingSubscriptions(RSocketResponder.java:275)
    at io.rsocket.core.RSocketResponder.cleanup(RSocketResponder.java:265)
    at io.rsocket.core.RSocketResponder.tryTerminate(RSocketResponder.java:167)
    at io.rsocket.core.RSocketResponder.tryTerminateOnConnectionClose(RSocketResponder.java:160)
    at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:132)
    at reactor.core.publisher.MonoProcessor$NextInner.onComplete(MonoProcessor.java:518)
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:308)
    at reactor.core.publisher.MonoProcessor.onComplete(MonoProcessor.java:265)
    at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:23)
    at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:60)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
    at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
    at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1158)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:760)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)  

如果我不进行合并,我没有错误。

我尝试了许多不同的版本,但我无法找到一种方法来让客户端退出时推送和无错误记录。

我错过了什么?

非常感谢。

4

1 回答 1

0

从 spring-boot 2.3.0.RELEASE 升级到 2.3.1.RELEASE 时问题消失。

于 2020-07-06T15:57:57.200 回答