问题标签 [spring-cloud-stream-binder-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 如果在处理步骤中发生故障,如何使 Spring Cloud Stream Kafka Stream binder 重试处理消息?
我正在使用 Spring Cloud Stream 开发 Kafka Streams。在消息处理应用程序中,可能会产生错误。因此,不应再次提交并重试该消息。
我的申请方法——
这里如果DAO插入方法失败,消息不应该发布到输出主题,并且应该重试相同消息的处理。
我们如何配置 kafka 流绑定器来做到这一点?非常感谢您对此的任何帮助。
apache-kafka - 如何在 Spring Cloud Kafka Streams 应用程序中执行 flatTransform?
我正在尝试flatTransform
在 Spring Cloud Kafka Streams 应用程序中执行。但我不确定将KafkaStreamsStateStore
注释放在哪里。目前我收到错误:Invalid topology: StateStore activeInstruments is not added yet
. 如果有人能给我一些指导,我将不胜感激。
java - 如何处理 Spring 云流 kafka 流活页夹中的序列化错误?
我正在使用 Spring 云流 kafka 流活页夹编写一个 Kafka 流应用程序。
当消费者将消息发布到输出主题时,可能会出现类似Serialization error或Network error的错误。
在这段代码中 -
这里在产生错误的同时将消息返回到输出主题,如果发生错误,如何处理它。除了RetryTemplate之外,Kafka 流绑定器中是否有任何机制?
spring-cloud-stream - 了解 Spring Cloud Stream Kafka 和 Spring Retry
我有一个使用 Kafka 活页夹的 Spring Cloud Stream 项目,我正在尝试理解并最终自定义 Cloud Stream 使用的 RetryTemplate。
我没有找到很多关于它是如何工作的文档,但我读过的内容让我做出了以下假设:
- Cloud Stream 默认配置并启用 Spring Retry,包括默认的重试和退避策略。
- 默认情况下,任何未捕获的异常
@StreamListener
都会触发 Spring Retry - Cloud Stream 将以某种方式跟踪每条消息的 RetryContext 信息(如何?我不确定)
这些假设是否正确?
现在,在我的应用程序中,我有一个模式可以立即处理一些消息,但必须推迟其他消息稍后再试(使用指数退避等)。
我应该抛出异常导致 Spring Cloud Stream 在绑定层重试这些消息,还是自己实现重试并跟踪我自己的重试上下文?
如果我应该依赖 Cloud Stream 的重试设置,我应该如何自定义退避策略等?
spring-cloud-stream - Spring Cloud Stream - 修改 DLQ 消息
我将 Spring Cloud Stream 的 DLQ 功能与 Kafka 活页夹一起使用。当消息处理失败时,消息会按预期发送到 DLQ,但是,我希望能够修改发送到 DLQ 的消息以包含一些额外的诊断信息。问题是发送到DLQ的消息是原始消息;我所做的任何突变都会被忽略。到目前为止,我解决这个问题的方法是在消息发送到 DLQ 之前拦截消息,并添加存储在另一个 bean 中的额外信息。具体来说,我尝试了这两种方法:
- 解决方案:为 DLQ 实现一个普通的 Kafka
ProducerInterceptor
。问题:实现是在 Spring 上下文之外实例化的,所以我不能注入我需要的其他 bean。Spring Kafka 已经记录了这个解决方案,但是,它需要创建一个新的ProducerFactory
bean,这意味着我不能使用底层 Spring Cloud Stream 中的 bean。 - 解决方案:实现一个Spring
ChannelInterceptor
。问题:我无法获得对 DLQ 消息通道的引用,也无法获得底层通道名称,因此我无法仅为 DLQ 消息配置拦截器。
关于如何解决这个问题的任何想法?
scala - Spring Cloud Data Flow 自定义 Scala 处理器无法从 Starter Apps 发送/接收数据(SCDF 2.5.1 和 Spring Boot 2.2.6)
我一直致力于在 Scala 中为 Spring Cloud Data Flow 创建一个简单的自定义处理器,并且遇到了从/向启动应用程序发送/接收数据的问题。我一直无法看到通过流传播的任何消息。流的定义是我的自定义处理器time --trigger.time-unit=SECONDS | pass-through-log | log
在哪里。pass-through-log
我正在使用 Spring Cloud Data Flow 2.5.1 和 Spring Boot 2.2.6。
这是用于处理器的代码 - 我正在使用功能模型。
应用程序.yml
构建.gradle.kts
如果此处缺少代码示例,我已将整个项目发布到github 。我还在那里发布了日志,因为它们很长。
当我引导本地 Kafka 集群并将任意数据推送到input
主题时,我能够看到数据流经处理器。但是,当我在 Spring Cloud Data Flow 上部署应用程序时,情况并非如此。我正在 Kubernetes 中通过 Docker 部署应用程序。
此外,当我使用定义部署流时time --trigger.time-unit=SECONDS | log
,我会在日志接收器中看到消息。这让我确信问题出在定制处理器上。
我是否缺少一些简单的东西,例如依赖项或额外配置?任何帮助是极大的赞赏。
apache-kafka - spring cloud stream kafka - 功能方法:生产墓碑记录时从不调用消费者
这个问题与https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/455几乎相同,但对于Functional
方法
当产生正确的 'Foo' (json) 时,调用 fooConsumer。但是,当产生“空”有效负载/墓碑时,永远不会调用使用者函数。
解决方法我尝试使用有效的自定义“转换器”方法:
然后fooConsumer
可以检查payload.isDeleted()
处理null
情况。但这是冗长的,会污染 pojo/model 类,并且必须为每个消费者重复。
我知道 spring-integration 不能与 null 一起使用。但是有没有更好/标准的方法来处理使用功能方法的“墓碑”用例?
版本:3.0.4.RELEASE
reactive-programming - mongoDB在不同线程上调用时插入两次
基本上我正在使用来自 spring cloud stream kafka 的消息并将其插入到 MongoDB 如果我的 mongo 集群启动,我的代码工作正常我有 2 个问题如果我的 Mongo 实例关闭
- 云流的自动提交已禁用(autoCommitOffset 设置为 false),即使尚未确认消息,也不会发生重新轮询
- 在检查 Mongo 连接时,如果它收到两个具有相同 ID 的消息,则在该时间段内需要一些时间,然后如果我启动 mongo 实例,它会复制在正常情况下工作正常的消息
我们有解决这些问题的办法吗?
这是我的代码,
}
这是我的服务班
这是我的 application.yml
编辑 1. 确认为空
java - Kafka Streams 和写入状态存储
我正在开发一个使用 Spring Cloud Stream 构建的 Kafka Streams 应用程序。在这个应用程序中,我需要:
- 使用可在以后检索的连续消息流。
- 保留与某些条件匹配的消息 ID 列表。
- 在一个单独的线程中,运行一个调度程序,它会定期读取消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。
- 从列表中删除已处理的消息 ID,以免重复工作。
我考虑过如下实施:
- 将传入的消息流作为具体化的 KTable 使用,以便我以后可以通过键查找和检索消息。
- 在另一个状态存储中实现消息 ID 列表。
InteractiveQueryService
使用 Spring 的调度机制运行一个单独的线程,该线程通过bean从状态存储中读取。
我遇到的问题是InteractiveQueryService
提供了对状态存储的只读访问权限,因此我无法删除另一个线程中的条目。我决定不使用 Kafka Stream 的 punctuate 功能,因为语义不同;我的调度线程必须始终定期运行,而不管入站消息的处理。
另一种选择可能是使用低级处理器 API,并将对可写状态存储的引用传递给我的调度程序线程。我需要同步写操作。但是我不确定这是否可行,或者在从单独的线程访问这样的状态存储时是否存在其他限制。
任何意见或建议将不胜感激!
spring-boot - Spring Cloud Stream Kafka 多重绑定
我正在使用 Spring Cloud Stream Kafka binder 来使用来自 Kafka 的消息。我可以使我的示例与单个 Kafka Binder 一起工作,如下所示
请注意,这两个绑定都指向同一个 Kafka Broker。但是,我有一种情况,我需要发布到某个 Kafka 集群中的一个主题,并且还需要从另一个 Kafka 集群中的另一个主题消费。我应该如何更改我的配置才能绑定到不同的 Kafka 集群?
我尝试过这样的事情
和
但是他们两个似乎都不起作用。第一个配置似乎无效。对于第二个配置,我收到以下错误
我正在使用依赖项 'org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.1.RELEASE' 和 Spring Boot 2.2.6
请让我知道如何使用 Spring Cloud Stream 为 Kafka 配置多个绑定
更新
在下面尝试了此配置
消息流和 EventHubBinding 如下
我的 Producer 类如下所示
消费者类如下所示
尝试发布和使用测试中的消息时出现以下错误。
我错过了什么吗?对于单个集群,我能够发布和使用消息。该问题仅发生在多个集群绑定中