问题标签 [spring-cloud-stream-binder-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
397 浏览

java - 有没有办法使用 Spring Cloud Function 方法从主题中轮询消息?

我们使用SubscribableChannel,MessageChannelPollableMessageSource配置使用@EnableBindingand @StreamListner。现在我们需要迁移到函数式方法。SubscribableChannel并且MessageChannel可以使用 Consumer 和 Supplier bean 进行转换,但我们无法 migrate PollableMessageSource。由于如果我们使用,功能会被禁用@EnableBinding,因此我们被困在迁移中。我们尝试调查,@PollableBean但似乎它仅适用于具有有限流的 Reactive Supplier。有什么办法吗?参考:https ://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html

0 投票
1 回答
374 浏览

apache-kafka - Spring Cloud Stream 以编程方式创建和删除主题

我有一个要求,我需要以编程方式创建和删除 Kafka 主题。org.springframework.cloud.stream.binding.BinderAwareChannelResolver. resolveDestination(String channelName)可以创建主题,但不推荐使用。此外,我知道无法以编程方式删除 Kafka 主题。

欢迎任何实现这一目标的建议!

0 投票
1 回答
67 浏览

java - Spring Cloud Kinesis Binder 如何为生产者处理错误 - 根据文档,它不起作用

我遵循了以下文档,并且我有一个生产者和消费者使用 Kinesis Stream 工作得很好。我想了解在发生任何异常时如何处理生产者(源)中的错误。

根据 Spring Stream 错误处理文档,我尝试了以下方法:

两者都不起作用。我在生产者方法中显式抛出 RuntimeException 并期望它将在“errorChannel”中但无法接收到相同的。

如果有人成功做到这一点,请帮助我弄清楚这一点或与我分享方法。

0 投票
1 回答
66 浏览

apache-kafka - 即使没有启动记录的主题压缩,GlobalKTable 是否总是每个分区键只有 1 条记录?

我有一个通过设置 cleanup.policy=compact 启用压缩的 Kafka 主题。我的 segment.bytes 属性设置为稍大的值 (100Mb),以便我的经纪人表现良好。

如果我有一个将主题用作 GlobalKTable 的 Kafka 流应用程序,并且主题中的单个分区键有多个记录,那么该应用程序会在 GlobalKTable 中仅接收 1 条记录,还是在压缩开始之前将有两条记录?

0 投票
1 回答
381 浏览

spring-cloud-stream - Spring Cloud Stream 和 Spring RetryTemplate 处理嵌套异常

我有一个使用 Kafka 活页夹的 Spring Cloud Stream 项目,我想添加重试功能,我正在尝试使用 RetryTemplate 并指定我想要处理的某些异常,但由于任何异常都被 MessageTransformationException 包装,我无法配置它以我想要的方式。有没有办法处理嵌套异常?

重试模板

堆栈跟踪

所以它忽略了我为 MyException 设置的配置

0 投票
0 回答
83 浏览

java - Spring Cloud Stream 3.0.7 测试逻辑

作为一个刚接触 Spring 但具有流处理背景的人,我对如何测试用 Spring Cloud Stream 编写的处理器感到非常困惑。测试文档(为 2.2.0 编写,但似乎是最新的,所以我猜对 3.0.7 仍然有效?)表明处理器应该通过注释自动装配到测试类中,但没有提到这个“处理器”在哪里豆来自。我试图通过以下方式提供它:

但是在运行测试时,异常总是相同的:

我什至在正确的道路上吗?在过去的一年中似乎有很多不推荐使用的功能,因此任何指向更新文档的指针都将不胜感激。

0 投票
1 回答
298 浏览

java - Kafka 流 GlobalKTable 在 Tombstone 上引发反序列化异常-空值-记录

我有一个基于 Spring 云流的 Kafka Streams 应用程序,我在其中将 Global KTable 绑定到 Compact 主题。当我将 Tombstone 记录推送到主题(具有空值的非空键)时 - 我的 Kafka 流应用程序因反序列化异常而失败。失败是因为我的反序列化器不处理空记录。

从文档中,我认为 GlobalKTable 甚至不会“看到”空值记录。不是这样吗?我需要在反序列化器中处理空记录吗?

0 投票
1 回答
171 浏览

java - 使用spring cloud stream kafka读取消息的编程方式

我有一个与之相关的主题和 DLQ。我正在使用@StreamListener 作为主题。我想使用控制器端点按需读取/处理来自 DLQ 的消息。

是否可以使用 Spring Cloud Stream Kafka 来做到这一点。

我们没有在生产中使用执行器。所以不能使用 /bindings 端点。

0 投票
1 回答
172 浏览

spring-cloud-stream - Spring Cloud Stream Kafka 异常处理

我正在开发一个事件导致spring数据存储库保存数据的应用程序;

此代码可以抛出各种异常,例如 DataIntegrityViolationException ,它是运行时异常。

我的问题是如何

  • 处理此类异常并
  • 生成带有有效负载的消息导致此错误
  • 有例外,
  • 允许生产者采取行动。
0 投票
1 回答
425 浏览

apache-kafka - Google Functons 通过 spring-cloud-stream 和 spring-cloud-functions 从 Kafka 接收消息

我知道使用 Kafka Connect 您可以创建连接器将所有 kafka 消息接收到 PUBSUB 主题。

我想知道我是否可以将 Spring Cloud Stream 绑定器与 Spring Cloud Function 结合使用并将所有这些部署到 Google Functions。

在我理解正确的情况下,我可以结合 Spring Cloud Stream 和 Spring Cloud Function。这是否意味着我可以使用 Spring Cloud Stream 的 binder 来实际接受来自 Kafka 的消息?