问题标签 [spring-cloud-stream-binder-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1624 浏览

apache-kafka - 允许禁用云流的配置?

我有一个 Spring Boot 应用程序,它有两个功能 Http 请求和 kafka 消息处理。我希望此应用程序以从 application.yml 启用的模式运行,即如果用户只想为 http 请求启用它,则不应连接 kafka。

我可以通过使用@KafkaListener 的以下属性禁用自动配置,使用普通的 spring boot kafka 插件来实现这一点,

autoStartup="${module.put:false}"

现在我们正在尝试移动到云流,我发现通过删除云流和活页夹的库来禁用它的唯一方法。有没有更好的方法使用具有自动配置模式的属性来禁用它,或者是否有任何手动配置选项可用?

0 投票
1 回答
125 浏览

java - 如何在 Spring Cloud Stream Kafka 绑定中编写订阅主题的方法?

我想编写一个使用 KafkaListener 订阅主题的消费者方法。

我找到了一个建议这个答案 -

现在,注释中的“主题”是订阅主题的名称。但什么是“id”字段?或者,还有更好的方法?

感谢您的帮助。

0 投票
1 回答
428 浏览

spring-boot - 使用 Spring Cloud Stream Kafka binder 重复消费消息

我们有几个使用 Spring Boot 和 Spring Cloud Stream Kafka binder 的微服务在它们之间进行通信。

有时,我们会观察到消费者收到的重复消息的爆发——通常是在首次消费和处理(成功)之后的几天。

虽然我知道 Kafka 不保证只交付一次,但它看起来仍然很奇怪,因为在经纪人和服务的日志中没有重新平衡事件或任何“可疑”活动。由于消费者正在与外部 API 进行交互,因此使其具有幂等性有点困难。

任何提示可能是重复的原因?我应该寻找什么来解决这个问题?

我们使用的是 Kafka 代理 1.0.0,而这个特定的消费者使用 Spring Cloud Stream Binder Kafka 2.0.0,它基于 kafka-client 1.0.2(其他服务的版本可能有点不同)。

0 投票
0 回答
212 浏览

apache-kafka - 在 Spring Cloud Kafka Streams 中序列化/反序列化泛型类型

主要目的是从一个主题中读取一个流,应用一些转换,然后将两个事件发送到其他主题。为此,我们使用 Kstream.branch() 函数并使用函数式编程。代码是:

输入 POJO:

输出 POJO:

处理器功能:

绑定在 appplication.properties 中声明:

输入:

输出:

问题在于应用程序何时评估谓词。它似乎试图转换为FooEvent<Bar>. 它可以很好地转换eventId, eventTime, eventAction, ... 字段,但是当涉及到entity字段(在这种情况下Bar)时,它将值存储在 HashMap 上(而不是创建新Bar对象并设置正确的字段),这让我相信Spring 默认 Serde (JsonSerde) 做错了什么。有关如何解决 Kafka Streams 中的泛型类型 Serde 问题的任何建议?

0 投票
1 回答
276 浏览

spring-boot - 如何使用 spring-cloud-stream-binder-kafka 和 RetryTemplate 启用有状态重试?

我想知道是否有一种方法可以使用 spring-cloud-stream-binder-kafka 启用 Stateful RetryTemplate。

我注意到有一个构造函数

通过这段代码进行调试,我注意到布尔值是使用 spring-cloud-stream-binder-kafkafalse传递给参数的。stateful

相关链接:KafkaMessageDrivenChannelAdapter.javaRetryingMessageListenerAdapter.java

我有几个关于这个话题的问题。

  1. 有没有办法通过 application.yml 或将RetryingMessageListenerAdapter's member设置stateful为 true ListenerContainerCustomizer
  2. 有没有办法禁用RetryTemplate使用 spring-cloud-stream-binder-kafka?我认为我发现的最接近的是max-attempts在我的@StreamRetryTemplate.
  3. 如果我使用的是 spring-kafka 提供的,那么我应该设置为 trueSeekToCurrentErrorHandler有什么好处或理由,因为它已经是一个有状态的错误处理机制?statefulSeekToCurrentErrorHandler

请告诉我。

0 投票
1 回答
1222 浏览

spring-cloud-stream - 响应式 Spring Cloud Stream Kafka 中的消息被忽略

我有一个 spring 云应用程序,使用 spring 响应式核心监听两个主题,每个主题有 10 个分区。

在消费者中,我只是在阅读消息并打印主题、分区和偏移量,有些消息没有被读取。

我已经尝试过自动提交手动确认

测试设置:将 30K 消息推送到每个主题 Topic 1 和 Topic 2 并启动应用程序,它只读取 59999 条记录而不是 60000 条。

所有主题的所有分区中的滞后为 0,表明所有数据都已消耗。

  1. 收货人代码:

我的 Application.yml 包含以下信息

我的日志文件显示消费者没有读到 Topic: Topic1partition: 2offset ,它从to76031跳转,7603076032

的相关部分pom.xml

0 投票
1 回答
194 浏览

apache-kafka - Spring Stream 侦听器轮询卡住

我们有以 BATCH 模式运行的 Spring Stream Listener。每个批处理的处理时间约为 3 ms。以下是我们的配置:

我们看到以下行为

如上所示,在我们收到下一组消息之前,我们看到了 5 分钟的间隔;这种模式仍在继续。

我们在 Kafka 分区中有大量消息等待处理。因此,不乏现成的消息。

不知道为什么我们反复获得 5 分钟的静默 - 我们将最大轮询等待/轮询间隔设置为 5 分钟。这应该没问题,因为 500 的最大轮询记录可以在轮询时立即得到满足。

获取最小字节为 2 MB,最大字节为 5 MB;这又可以通过我们拥有的消息数量来满足。

请让我知道我错过了什么。

0 投票
1 回答
752 浏览

java-8 - 如何创建一个以集合对象为值的 KStream?

我正在研究 Spring 云流 kafka 流活页夹。在我的消费者 bean 方法中,我想以字符串列表作为值返回 KStream -

是否可以拥有一个以集合为值的 KStream?如果是这样,谁能解释如何创建它?

0 投票
1 回答
778 浏览

java - Confluent Cloud Schema Registry Unauthorized错误的spring-cloud-stream-binder-kafka配置

使用spring-cloud-stream-binder-kafka. 可能有人可以看到什么是错的?

当我使用https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/中的示例时, 它工作正常,我可以在 Confluent Cloud 上看到消息

但是,当使用配置添加相同的连接详细信息时spring-cloud-stream-binder-kafka,它会返回未经授权的错误。

我的下面的配置给出了上述错误。不知道出了什么问题?

Confluent 中运行良好的示例:

0 投票
1 回答
148 浏览

apache-kafka - 当处理每条消息需要很长时间时,具有 6 个 Kafka 消费者的最大吞吐量策略

考虑这种情况:具有 6 个分区的 Kafka 主题。具有 6 个副本的 Spring Java Kafka 消费者应用程序,以便每个副本处理其中一个分区。

我面临的问题是消费者中每条消息的处理需要很长时间(约 20 秒),因为它需要调用一个非常慢的外部系统。

因此,即使我提供了 6 个分区/副本,我最终还是遇到了瓶颈,即 6 个消费者每条消息阻塞约 20 秒,这意味着每 20 秒的吞吐量为 6 条消息!

考虑到我无法修改外部系统的行为,您能否建议加快这种情况的方法?