问题标签 [spring-cloud-stream-binder-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 允许禁用云流的配置?
我有一个 Spring Boot 应用程序,它有两个功能 Http 请求和 kafka 消息处理。我希望此应用程序以从 application.yml 启用的模式运行,即如果用户只想为 http 请求启用它,则不应连接 kafka。
我可以通过使用@KafkaListener 的以下属性禁用自动配置,使用普通的 spring boot kafka 插件来实现这一点,
autoStartup="${module.put:false}"
现在我们正在尝试移动到云流,我发现通过删除云流和活页夹的库来禁用它的唯一方法。有没有更好的方法使用具有自动配置模式的属性来禁用它,或者是否有任何手动配置选项可用?
java - 如何在 Spring Cloud Stream Kafka 绑定中编写订阅主题的方法?
我想编写一个使用 KafkaListener 订阅主题的消费者方法。
我找到了一个建议这个答案 -
现在,注释中的“主题”是订阅主题的名称。但什么是“id”字段?或者,还有更好的方法?
感谢您的帮助。
spring-boot - 使用 Spring Cloud Stream Kafka binder 重复消费消息
我们有几个使用 Spring Boot 和 Spring Cloud Stream Kafka binder 的微服务在它们之间进行通信。
有时,我们会观察到消费者收到的重复消息的爆发——通常是在首次消费和处理(成功)之后的几天。
虽然我知道 Kafka 不保证只交付一次,但它看起来仍然很奇怪,因为在经纪人和服务的日志中没有重新平衡事件或任何“可疑”活动。由于消费者正在与外部 API 进行交互,因此使其具有幂等性有点困难。
任何提示可能是重复的原因?我应该寻找什么来解决这个问题?
我们使用的是 Kafka 代理 1.0.0,而这个特定的消费者使用 Spring Cloud Stream Binder Kafka 2.0.0,它基于 kafka-client 1.0.2(其他服务的版本可能有点不同)。
apache-kafka - 在 Spring Cloud Kafka Streams 中序列化/反序列化泛型类型
主要目的是从一个主题中读取一个流,应用一些转换,然后将两个事件发送到其他主题。为此,我们使用 Kstream.branch() 函数并使用函数式编程。代码是:
输入 POJO:
输出 POJO:
处理器功能:
绑定在 appplication.properties 中声明:
输入:
输出:
问题在于应用程序何时评估谓词。它似乎试图转换为FooEvent<Bar>
. 它可以很好地转换eventId
, eventTime
, eventAction
, ... 字段,但是当涉及到entity
字段(在这种情况下Bar
)时,它将值存储在 HashMap 上(而不是创建新Bar
对象并设置正确的字段),这让我相信Spring 默认 Serde (JsonSerde) 做错了什么。有关如何解决 Kafka Streams 中的泛型类型 Serde 问题的任何建议?
spring-boot - 如何使用 spring-cloud-stream-binder-kafka 和 RetryTemplate 启用有状态重试?
我想知道是否有一种方法可以使用 spring-cloud-stream-binder-kafka 启用 Stateful RetryTemplate。
我注意到有一个构造函数
通过这段代码进行调试,我注意到布尔值是使用 spring-cloud-stream-binder-kafkafalse
传递给参数的。stateful
相关链接:KafkaMessageDrivenChannelAdapter.java、RetryingMessageListenerAdapter.java
我有几个关于这个话题的问题。
- 有没有办法通过 application.yml 或将
RetryingMessageListenerAdapter
's member设置stateful
为 trueListenerContainerCustomizer
? - 有没有办法禁用
RetryTemplate
使用 spring-cloud-stream-binder-kafka?我认为我发现的最接近的是max-attempts
在我的@StreamRetryTemplate
. - 如果我使用的是 spring-kafka 提供的,那么我应该设置为 true
SeekToCurrentErrorHandler
有什么好处或理由,因为它已经是一个有状态的错误处理机制?stateful
SeekToCurrentErrorHandler
请告诉我。
spring-cloud-stream - 响应式 Spring Cloud Stream Kafka 中的消息被忽略
我有一个 spring 云应用程序,使用 spring 响应式核心监听两个主题,每个主题有 10 个分区。
在消费者中,我只是在阅读消息并打印主题、分区和偏移量,有些消息没有被读取。
我已经尝试过自动提交和手动确认。
测试设置:将 30K 消息推送到每个主题 Topic 1 和 Topic 2 并启动应用程序,它只读取 59999 条记录而不是 60000 条。
所有主题的所有分区中的滞后为 0,表明所有数据都已消耗。
- 收货人代码:
我的 Application.yml 包含以下信息
我的日志文件显示消费者没有读到 Topic: Topic1
partition: 2
offset ,它从to76031
跳转,76030
76032
的相关部分pom.xml
:
apache-kafka - Spring Stream 侦听器轮询卡住
我们有以 BATCH 模式运行的 Spring Stream Listener。每个批处理的处理时间约为 3 ms。以下是我们的配置:
我们看到以下行为
如上所示,在我们收到下一组消息之前,我们看到了 5 分钟的间隔;这种模式仍在继续。
我们在 Kafka 分区中有大量消息等待处理。因此,不乏现成的消息。
不知道为什么我们反复获得 5 分钟的静默 - 我们将最大轮询等待/轮询间隔设置为 5 分钟。这应该没问题,因为 500 的最大轮询记录可以在轮询时立即得到满足。
获取最小字节为 2 MB,最大字节为 5 MB;这又可以通过我们拥有的消息数量来满足。
请让我知道我错过了什么。
java-8 - 如何创建一个以集合对象为值的 KStream?
我正在研究 Spring 云流 kafka 流活页夹。在我的消费者 bean 方法中,我想以字符串列表作为值返回 KStream -
是否可以拥有一个以集合为值的 KStream?如果是这样,谁能解释如何创建它?
java - Confluent Cloud Schema Registry Unauthorized错误的spring-cloud-stream-binder-kafka配置
使用spring-cloud-stream-binder-kafka
. 可能有人可以看到什么是错的?
当我使用https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/中的示例时, 它工作正常,我可以在 Confluent Cloud 上看到消息
但是,当使用配置添加相同的连接详细信息时spring-cloud-stream-binder-kafka
,它会返回未经授权的错误。
我的下面的配置给出了上述错误。不知道出了什么问题?
Confluent 中运行良好的示例:
apache-kafka - 当处理每条消息需要很长时间时,具有 6 个 Kafka 消费者的最大吞吐量策略
考虑这种情况:具有 6 个分区的 Kafka 主题。具有 6 个副本的 Spring Java Kafka 消费者应用程序,以便每个副本处理其中一个分区。
我面临的问题是消费者中每条消息的处理需要很长时间(约 20 秒),因为它需要调用一个非常慢的外部系统。
因此,即使我提供了 6 个分区/副本,我最终还是遇到了瓶颈,即 6 个消费者每条消息阻塞约 20 秒,这意味着每 20 秒的吞吐量为 6 条消息!
考虑到我无法修改外部系统的行为,您能否建议加快这种情况的方法?