问题标签 [spring-cloud-stream-binder-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 有没有办法使用 Spring Cloud Function 方法从主题中轮询消息?
我们使用SubscribableChannel
,MessageChannel
和PollableMessageSource
配置使用@EnableBinding
and @StreamListner
。现在我们需要迁移到函数式方法。SubscribableChannel
并且MessageChannel
可以使用 Consumer 和 Supplier bean 进行转换,但我们无法 migrate PollableMessageSource
。由于如果我们使用,功能会被禁用@EnableBinding
,因此我们被困在迁移中。我们尝试调查,@PollableBean
但似乎它仅适用于具有有限流的 Reactive Supplier。有什么办法吗?参考:https ://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html
apache-kafka - Spring Cloud Stream 以编程方式创建和删除主题
我有一个要求,我需要以编程方式创建和删除 Kafka 主题。org.springframework.cloud.stream.binding.BinderAwareChannelResolver
. resolveDestination(String channelName)
可以创建主题,但不推荐使用。此外,我知道无法以编程方式删除 Kafka 主题。
欢迎任何实现这一目标的建议!
java - Spring Cloud Kinesis Binder 如何为生产者处理错误 - 根据文档,它不起作用
我遵循了以下文档,并且我有一个生产者和消费者使用 Kinesis Stream 工作得很好。我想了解在发生任何异常时如何处理生产者(源)中的错误。
根据 Spring Stream 错误处理文档,我尝试了以下方法:
两者都不起作用。我在生产者方法中显式抛出 RuntimeException 并期望它将在“errorChannel”中但无法接收到相同的。
如果有人成功做到这一点,请帮助我弄清楚这一点或与我分享方法。
apache-kafka - 即使没有启动记录的主题压缩,GlobalKTable 是否总是每个分区键只有 1 条记录?
我有一个通过设置 cleanup.policy=compact 启用压缩的 Kafka 主题。我的 segment.bytes 属性设置为稍大的值 (100Mb),以便我的经纪人表现良好。
如果我有一个将主题用作 GlobalKTable 的 Kafka 流应用程序,并且主题中的单个分区键有多个记录,那么该应用程序会在 GlobalKTable 中仅接收 1 条记录,还是在压缩开始之前将有两条记录?
spring-cloud-stream - Spring Cloud Stream 和 Spring RetryTemplate 处理嵌套异常
我有一个使用 Kafka 活页夹的 Spring Cloud Stream 项目,我想添加重试功能,我正在尝试使用 RetryTemplate 并指定我想要处理的某些异常,但由于任何异常都被 MessageTransformationException 包装,我无法配置它以我想要的方式。有没有办法处理嵌套异常?
重试模板
堆栈跟踪
所以它忽略了我为 MyException 设置的配置
java - Spring Cloud Stream 3.0.7 测试逻辑
作为一个刚接触 Spring 但具有流处理背景的人,我对如何测试用 Spring Cloud Stream 编写的处理器感到非常困惑。测试文档(为 2.2.0 编写,但似乎是最新的,所以我猜对 3.0.7 仍然有效?)表明处理器应该通过注释自动装配到测试类中,但没有提到这个“处理器”在哪里豆来自。我试图通过以下方式提供它:
但是在运行测试时,异常总是相同的:
我什至在正确的道路上吗?在过去的一年中似乎有很多不推荐使用的功能,因此任何指向更新文档的指针都将不胜感激。
java - Kafka 流 GlobalKTable 在 Tombstone 上引发反序列化异常-空值-记录
我有一个基于 Spring 云流的 Kafka Streams 应用程序,我在其中将 Global KTable 绑定到 Compact 主题。当我将 Tombstone 记录推送到主题(具有空值的非空键)时 - 我的 Kafka 流应用程序因反序列化异常而失败。失败是因为我的反序列化器不处理空记录。
从文档中,我认为 GlobalKTable 甚至不会“看到”空值记录。不是这样吗?我需要在反序列化器中处理空记录吗?
java - 使用spring cloud stream kafka读取消息的编程方式
我有一个与之相关的主题和 DLQ。我正在使用@StreamListener 作为主题。我想使用控制器端点按需读取/处理来自 DLQ 的消息。
是否可以使用 Spring Cloud Stream Kafka 来做到这一点。
我们没有在生产中使用执行器。所以不能使用 /bindings 端点。
spring-cloud-stream - Spring Cloud Stream Kafka 异常处理
我正在开发一个事件导致spring数据存储库保存数据的应用程序;
此代码可以抛出各种异常,例如 DataIntegrityViolationException ,它是运行时异常。
我的问题是如何
- 处理此类异常并
- 生成带有有效负载的消息导致此错误
- 有例外,
- 允许生产者采取行动。
apache-kafka - Google Functons 通过 spring-cloud-stream 和 spring-cloud-functions 从 Kafka 接收消息
我知道使用 Kafka Connect 您可以创建连接器将所有 kafka 消息接收到 PUBSUB 主题。
我想知道我是否可以将 Spring Cloud Stream 绑定器与 Spring Cloud Function 结合使用并将所有这些部署到 Google Functions。
在我理解正确的情况下,我可以结合 Spring Cloud Stream 和 Spring Cloud Function。这是否意味着我可以使用 Spring Cloud Stream 的 binder 来实际接受来自 Kafka 的消息?