问题标签 [spring-cloud-stream-binder-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
1126 浏览

java - 使用 Spring Cloud Stream 的可靠异步批处理 kafka 生产者

我想使用 Spring Cloud Stream 设置一些转换(从一个主题消费,生产到另一个主题)。我也希望它是

  • 可靠 - 让我们说“至少一次”
  • 快速生产 - 每批次执行一次 producer.flush(),而不是每条消息
  • 快速偏移提交 - 每批从分区执行一次提交

使用原始 kafka 客户端,或者使用 spring-kafka 我会做下一个(假设生产者配置正确:acks=all,linger.ms > 0,batch-size 足够大等等):

  1. 获取消费消息(假设来自 consumer.poll())
  2. 为每条消息做我的自定义转换
  3. 每个消息的 producer.send(message).addCallback(...)
  4. 生产者.flush()
  5. 检查发送回调中是否没有错误
  6. consumer.commit() - 最高偏移量

问题是如何与 spring-cloud-stream 实现相似?我知道consumer.batch-mode = true和producer.sync = false,但我不确定将可靠产品与偏移提交绑定在一起的正确方法是什么

UPDATE 我想出了一个简单的解决方案(请注意,除了初始要求之外,我还需要动态路由和动态主题创建):

配置

应用程序.yml

我可以在这里看到什么缺点:

  1. 使用已弃用的 BinderAwareChannelResolver:不知道如何spring.cloud.stream.sendto.destination在我的情况下使用
  2. 在每批之后它会等待多余的 ${batch-timeout}
  3. 不确定producer.onSuccess方法的保证(如果“重复”成功调用是可能的),尤其是当来自单个传入批次的消息被路由到不同的主题时
0 投票
1 回答
1118 浏览

apache-kafka - Avro反序列化异常导致的Spring Cloud Stream无限重试

我正在使用带有 Kafka binder 和 Avro 的 Spring Cloud Stream 2.2.0 版。显然,错误的记录已发布到 Kafka 主题之一,导致所有消费者返回反序列化错误并进行某种无限重试。

从技术上讲,应该有一种方法可以指定反序列化异常的策略。我可以找到一些不同的策略,例如logAndContinuesendToDlq但它们适用于我在我的应用程序中不使用的 Kafka 流。如果有人可以帮助我了解我在这里缺少的东西,我将不胜感激。

0 投票
1 回答
2581 浏览

spring-boot - Spring Cloud Stream:Kafka生产者和消费者的多个绑定器具有单独的jaas配置不能一起工作

我正在尝试使用 spring cloud 和 binder 在同一个 Spring boot 应用程序中实现 Kafka 消费者和 Kafka 生产者。如果单独执行,两者都可以成功运行,但如果一起执行,则只有 Kafka Producer 能够成功连接到 kafka 集群,但 Kafka Consumer 无法登录到 Kafka 集群。我认为问题在于 kafka 生产者和消费者的多个/不同的 jaas 配置。请在我的 application.yml 文件下面找到

如果我在主类中使用@EnableBinding(Source.class) 或@EnableBinding(Sink.class) 运行相同的application.yml,它会以Kafka Producer 或Kafka Consumer 的身份成功连接kafka 集群。但是当我使用@EnableBinding(Processor.class)运行相同的application.yml时,我遇到了kafka消费者的错误(Kafka Producer工作正常并连接到Kafka集群)。问题仅与 Kafka 消费者有关:无权访问该主题。

请检查并阐明如何在 Spring Cloud Stream binder application 中传递多个 jaas 配置

0 投票
0 回答
341 浏览

spring-boot - 创建 org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration 中定义的名称为“compositeMessageChannelConfigurer”的 bean 时出错

在此处输入图像描述

微服务一:

artefact id - microservice1 定义了 KafkaStreams.java 接口,其中定义了 binder 属性,如下所示。

微服务 2: artefact id - microservice2

在微服务 2 中添加依赖微服务 1 并给云属性代理 conf 和主题名称。

堆栈跟踪 :

0 投票
2 回答
381 浏览

spring-kafka - Spring Cloud 函数从 strem 创建 GlobalKTable

是否有一个示例说明如何使用 Spring Cloud 流和使用函数方法创建 GlobalKTable 以保持 KStream 的计数?

0 投票
1 回答
254 浏览

spring - 如何访问 kafka 物化视图中的标题信息?

如何在物化视图中访问自定义标题?我正在尝试在我的应用程序中构建一些自定义 dlq 逻辑,并希望构建基于标头信息的重试机制。实际重试由调度程序触发,该调度程序应在物化视图中查找这些标头信息。

以下是一些代码片段:

创建物化视图:

调度器:

0 投票
1 回答
837 浏览

spring-kafka - 如何为每个 StreamListener 的 ConcurrentKafkaListenerContainerFactory 设置并发(或其他配置)

我们的应用程序(基于spring-cloud-stream)监听多个Kafka主题(TOPIC_A有3个分区,TOPIC_B有1个分区,TOPIC_C有10个分区)即3个@StreamListener方法。

我们需要自定义错误处理和重试机制,因此通过配置 ConcurrentKafkaListenerContainerFactory bean 来实现。

问题是现在我们需要 KafkaListenerContainer 的一些属性来改变每个@StreamListener(即在这种情况下是每个主题),比如说 TOPIC_A 的并发性为 3,TOPIC_C 的并发性为 10 等,而不是工厂上设置的通用并发性或设置 SeekToCurrentErrorHandler TOPIC_A,TOPIC_C 但不是 TOPIC_B(或某些主题的不同 ErrorHandler)。

每个容器级别如何实现这一点?


尝试使用下面共享的反射解决方案后的堆栈跟踪

0 投票
1 回答
183 浏览

spring-boot - 将 Spring Integration Router 与 Spring Cloud Stream 一起使用

我一直在尝试将 Spring Integration 的@Router与 Spring Cloud Stream 与 Kafka 绑定一起使用。我的理解是,当您List从带有注释的方法中返回一个通道名称时,@Router它们应该被生成到指定的 Kafka 主题中。但我没有看到正在生成的消息。

Spring Integration 是否@Router适用于 Spring Cloud Stream。如果没有,还有什么替代方法以及如何以编程方式路由到运行时选择的通道?

0 投票
1 回答
727 浏览

spring-kafka - 使用 @StreamListener 时,对 KafkaListenerContainerFactory 的自定义会反映在生成的 KafkaMessageListenerContainer 中吗?

我正在使用带有 kafka binder 的 spring-cloud-stream 来使用来自 kafka 的消息。该应用程序基本上是使用来自 kafka 的消息并更新数据库。

在某些情况下,DB 出现故障(可能会持续数小时)或其他一些临时技术问题。由于在这些情况下,在有限的时间内重试消息然后将其移动到 DLQ 是没有意义的,所以当我们遇到某些类型的异常(例如 DBHostNotAvaialableException)时,我试图实现无限次数的重试

为了实现这一点,我尝试了两种方法(两种方法都面临问题) -

  1. 在这种方法中,尝试在配置 ConcurrentKafkaListenerContainerFactory bean 时在容器属性上设置错误处理程序,但错误处理程序根本没有被触发。在调试流程时,我在创建的 KafkaMessageListenerContainer 中实现了 errorHandler 字段,因此它们使用默认的 LoggingErrorHandler。下面是我的容器工厂 bean 配置 - 这种方法的 @StreamListener 方法与第二种方法相同,除了对消费者的搜索。

    /li>

我在配置工厂 bean 时是否遗漏了一些东西,或者这个 bean 仅与 @KafkaListener 而不是 @StreamListener 相关?

  1. 第二种选择是尝试使用手动确认和查找来实现它,在 @StreamListener 方法中从标头获取确认和使用者,如果收到可重试的异常,我会使用 retrytemplate 进行一定次数的重试,当这些重试用尽时我会触发consumer.seek(). 下面的示例代码 -

    }

这种方法的问题- 因为我正在执行 consumer.seek() 而可能有来自上次轮询的待处理记录,如果 DB 在此期间出现(因此出现故障),这些记录可能会被处理和提交。有没有办法在执行搜索时清除这些记录?

PS - 我们目前处于 2.0.3.RELEASE 版本的 spring boot 和 Finchley.RELEASE 或 spring 云依赖项(因此也不能使用否定确认等功能,目前无法升级)。

0 投票
1 回答
84 浏览

spring-boot - Spring Cloud Stream 根据消息动态解析输入通道

我需要一种根据传入消息的类型动态解析入站通道的方法。我不是在寻找此链接 https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.0.M1/spring-cloud-stream 中已经提到的任何基于标头的解决方案。 html#_using_streamlistener_for_content_based_routing

必须根据消息的类型进行解析。如果有一个自定义绑定可以在应用程序启动时完成以便能够做到这一点,那应该没问题;请给我一些关于如何实现这一目标的示例。