我有一个多绑定器 spring 云流应用程序,我的实现与Retry With the RabbitMQ Binder的文档中解释的完全相似
我打算重试配置的次数并最终放弃。
但
如果错误类似于下面的错误,它甚至不会进入侦听器代码让我应用x-death计数逻辑。所以问题是我如何放弃反序列化和消息转换错误的消息:
2018-09-06 16:41:11.889 ERROR [data-connector,671345aea4270626,48dd1100d9e332f9,false] 8866 --- [rdedStarGroup-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.ducation.connector.event.domain.AwardResource] for GenericMessage [payload=byte[369], headers={amqp_deliveryTag=1, amqp_consumerQueue=outcome.awardedStar.dataConnectorAwardedStarGroup, amqp_redelivered=false, notificationType=AwardResource, spanTraceId=671345aea4270626, spanId=671345aea4270626, amqp_receivedRoutingKey=outcome.awardedStar.dataConnectorAwardedStarGroup, nativeHeaders={spanTraceId=[671345aea4270626], spanId=[671345aea4270626], spanSampled=[0]}, x-first-death-exchange=outcome.awardedStar, X-B3-SpanId=671345aea4270626, x-death=[{reason=expired, count=65, exchange=DLX, routing-keys=[outcome.awardedStar.dataConnectorAwardedStarGroup], time=Thu Sep 06 16:21:21 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup.dlq}, {reason=rejected, count=65, exchange=outcome.awardedStar, routing-keys=[#], time=Thu Sep 06 16:21:16 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup}], x-first-death-reason=rejected, X-B3-Sampled=0, x-first-death-queue=outcome.awardedStar.dataConnectorAwardedStarGroup, X-B3-TraceId=671345aea4270626, id=865fe31f-8cb4-fab1-ce1f-7673e741ac48, amqp_consumerTag=amq.ctag-Mz8gBRwIdx6czW6JCox2-w, spanSampled=0, contentType=application/plain, timestamp=1536237671843}], failedMessage=GenericMessage [payload=byte[369], headers={amqp_deliveryTag=1, amqp_consumerQueue=outcome.awardedStar.dataConnectorAwardedStarGroup, amqp_redelivered=false, notificationType=AwardResource, spanTraceId=671345aea4270626, spanId=671345aea4270626, amqp_receivedRoutingKey=outcome.awardedStar.dataConnectorAwardedStarGroup, nativeHeaders={spanTraceId=[671345aea4270626], spanId=[48dd1100d9e332f9], spanSampled=[0], X-B3-TraceId=[671345aea4270626], X-B3-SpanId=[48dd1100d9e332f9], X-B3-ParentSpanId=[671345aea4270626], spanParentSpanId=[671345aea4270626], X-B3-Sampled=[0]}, x-first-death-exchange=outcome.awardedStar, X-B3-SpanId=671345aea4270626, x-death=[{reason=expired, count=65, exchange=DLX, routing-keys=[outcome.awardedStar.dataConnectorAwardedStarGroup], time=Thu Sep 06 16:21:21 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup.dlq}, {reason=rejected, count=65, exchange=outcome.awardedStar, routing-keys=[#], time=Thu Sep 06 16:21:16 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup}], x-first-death-reason=rejected, X-B3-Sampled=0, x-first-death-queue=outcome.awardedStar.dataConnectorAwardedStarGroup, X-B3-TraceId=671345aea4270626, id=865fe31f-8cb4-fab1-ce1f-7673e741ac48, amqp_consumerTag=amq.ctag-Mz8gBRwIdx6czW6JCox2-w, spanSampled=0, contentType=application/plain, timestamp=1536237671843}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:165)
at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:60)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:240)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:207)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
2018-09-06 16:41:11.900 WARN [data-connector,,,] 8866 --- [rdedStarGroup-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) [spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) [spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) [spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.ducation.connector.event.domain.AwardResource] for GenericMessage [payload=byte[369], headers={amqp_deliveryTag=1, amqp_consumerQueue=outcome.awardedStar.dataConnectorAwardedStarGroup, amqp_redelivered=false, notificationType=AwardResource, spanTraceId=671345aea4270626, spanId=671345aea4270626, amqp_receivedRoutingKey=outcome.awardedStar.dataConnectorAwardedStarGroup, nativeHeaders={spanTraceId=[671345aea4270626], spanId=[671345aea4270626], spanSampled=[0]}, x-first-death-exchange=outcome.awardedStar, X-B3-SpanId=671345aea4270626, x-death=[{reason=expired, count=65, exchange=DLX, routing-keys=[outcome.awardedStar.dataConnectorAwardedStarGroup], time=Thu Sep 06 16:21:21 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup.dlq}, {reason=rejected, count=65, exchange=outcome.awardedStar, routing-keys=[#], time=Thu Sep 06 16:21:16 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup}], x-first-death-reason=rejected, X-B3-Sampled=0, x-first-death-queue=outcome.awardedStar.dataConnectorAwardedStarGroup, X-B3-TraceId=671345aea4270626, id=865fe31f-8cb4-fab1-ce1f-7673e741ac48, amqp_consumerTag=amq.ctag-Mz8gBRwIdx6czW6JCox2-w, spanSampled=0, contentType=application/plain, timestamp=1536237671843}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:165) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:60) ~[spring-integration-amqp-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:240) ~[spring-integration-amqp-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:207) ~[spring-integration-amqp-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
... 8 common frames omitted
注意:我知道错误本身及其引起的原因。有意测试x-death逻辑。
如果我的期望是不可能的,那么捕获此类异常以防止应用程序无限重试的最佳做法是什么?