我遵循了以下文档,并且我有一个生产者和消费者使用 Kinesis Stream 工作得很好。我想了解如何处理生产者(源)和消费者(处理器)中的错误,以防发生任何异常。
根据 Spring Stream 错误处理文档,我尝试了以下方法:
我尝试使用 @ServiceActivator("input-stream.input-stream-group.erros") - 这有效,但我的“输入流”是每个生产环境中的动态名称,根据我应该附加生产环境的策略定义数据流时的名称。这是首选方式,但如何解决这个问题?
我尝试过使用@ServiceActivator("errorChannel") - 这不起作用意味着如果我为此引入一种方法并放置一个记录器,则错误正在捕获和打印,但由于以下重新抛出而引发错误
org.springframework.cloud.stream.binding.StreamListenerMessageHandler
(第 53-68 行)catch (Exception e) { if (e instanceof MessagingException) { throw (MessagingException) e; } else { throw new MessagingException(requestMessage, "Exception thrown while invoking " + this.invocableHandlerMethod.getShortLogMessage(), e); } }
我已经自动装配了一个与“errorChannel”同名的 MessageChannel,并且在捕获异常时我已经准备了一条消息并发送给它,但在 ServiceActivator 方法中的行为与上面相同。
我该如何处理和解决这个问题?请建议并帮助我。