问题标签 [embedded-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
44 浏览

spring-cloud - 在每次测试方法后重置没有 EmbeddedKafka 的全局存储

我正在编写一个测试类,它有多种需要 Kafka 的方法。我的应用程序使用 kafka 流全局存储,并且每个测试都需要清除存储。如何在不重新启动 EmbeddedKafka 或为每个测试重置 EmbeddedKafka 的情况下清除每个测试的存储?

0 投票
0 回答
72 浏览

java - 使用带有 MockSchemaRegistryClient 的 EmbeddedKafka 使用 Spring 云流创建测试时出现问题

我试图弄清楚如何测试我的 Spring Cloud Streams Kafka-Streams 应用程序。

该应用程序看起来像这样:

流 1:主题 1 > 主题
2 流 2:主题 2 + 主题 3 加入 > 主题 4
流 3:主题 4 >主题 5

我尝试了不同的方法,例如 TestChannelBinder,但这种方法仅适用于 Simple 函数,而不适用于 Streams 和 Avro。

我决定将 EmbeddedKafka 与 MockSchemaRegistryClient 一起使用。我可以生产一个主题,也可以再次从同一个主题(topic1)消费,但我无法从(topic2)消费。

在我的测试 application.yaml 中,我放置了以下配置(我现在只测试第一个流,一旦它工作我想扩展它):

我的测试如下所示:

尝试从 topic2 消费和反序列化时收到以下异常:

不会有消息被消费。

所以我试图覆盖 SpecificAvroSerde 并直接注册模式并使用这个反序列化器。

使用这个解串器它也不会工作。有没有人有关于如何使用 EmbeddedKafka 和 MockSchemaRegistry 进行此测试的经验?还是我应该使用另一种方法?

如果有人可以提供帮助,我很高兴。先感谢您。

0 投票
0 回答
41 浏览

spring - EmbeddedKafka 和 Spring Cloud Stream 无法正常工作

我对卡夫卡很陌生。

我正在尝试为 Kafka Producer 编写集成测试。我在项目中还有 Spring Cloud Stream Function 以及 Avro 序列化器,所以我的生产者看起来像:

我的测试类看起来像:

但测试失败,因为没有收到任何消息。我验证了生产者和消费者具有相同的主题和代理地址。看起来 EmbeddedKafka 侦听器没有从供应商那里订阅 Flux。

我有以下依赖项:

所以我没有spring-cloud-stream-test-support我知道会导致问题的东西。

在测试中application.properties我添加了:

由于我的测试类配置,这是多余的,但现在没关系。

我也有主要的 application.yml 配置:

所以,让我们假设它使用真正的模式注册表。这不利于测试,但现在不是问题。

可能有人可以推荐另一种方法来测试我的制作人吗?我需要一种使用 Spring Cloud Function、Kafka 绑定发送测试消息的方法。

先感谢您。

0 投票
1 回答
87 浏览

java - NoClassDefFoundError: scala/collection/convert/AsJavaExtensions

我在 Java 测试中创建 EmbeddedKafkaCluster,但出现以下异常,但我添加了具有 scala 依赖关系的 kafka_2.12 依赖关系。

Java 版本:11

添加了以下依赖项

例外:

0 投票
1 回答
185 浏览

spring-boot - 使用 EmbeddedKafka 进行集成测试:ContainerTestUtil.waitForAssignment throws Expected 1 but got 0 partitions

我使用 spring-kafka 库为我的 kafka 消费者编写了一个集成测试。此测试使用 EmbeddedKafka。使用具有一个分区的主题。我为此使用了 KafkaMessageListener 容器。但我在这一行遇到错误

我得到的错误是:

java.lang.IllegalStateException:预期为 1,但得到了 0 个分区。

我提到的代码是:https ://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/

getKafkaConsumer() 函数是:

0 投票
0 回答
34 浏览

integration-testing - 在嵌入式 kafka 的集成测试中启动 Kafka Streams 应用程序

我正在尝试使用嵌入式 kafka 进行 kafka 流应用程序的集成测试。我能够在嵌入式 kafka 中设置主题并向该主题生成消息。但是当我使用以下 KafkaStreams kafkaStreams = this.streamsBuilderFactoryBean.getKafkaStreams(); 启动我的 kafka 流时 StreamsBuilder 流构建器 = 新 StreamsBuilder(); KStream 流 = streamsBuilder.stream(INPUT_TOPIC);

流状态处于“重新平衡”状态。请确认以上是否是启动流应用程序的正确方法。谢谢

0 投票
1 回答
222 浏览

spring-boot - 如何在 Spring Boot 中向 EmbeddedKafka 发送具有特定偏移量的消息?

我正在为连接到 Kafka 以使用和发布数据的应用程序编写集成测试,为此我正在使用 EmbeddedKafka。部分逻辑是使用特定偏移量的消息。我想模拟这个,因此我的目标是:

  1. 向 EmbeddedKafka 发送一些消息,但具有特定的偏移量
  2. 以相同的偏移量使用它们

这现在不起作用,即我正在发送带有 的消息KafkaHeaders.OFFSET,但它被忽略了,我之后使用的消息具有不同的偏移量。事实上,偏移量只是从 0 开始,然后递增。

KafkaTemplate<String, String>以标准方式初始化。另一方面,我以标准方式消费:

简而言之,接收到的消息的偏移量与onMessage消息构建期间设置的偏移量不同。

有什么办法可以做到这一点?

0 投票
2 回答
635 浏览

java - 嵌入式 Kafka Spring 测试在嵌入式 Kafka 准备就绪之前执行

我有一个 Spring Boot 项目,它有一个 Kafka 侦听器,我想使用 Embedded Kafka 进行测试。我让 Kafka Listener 注销消息“收到记录”。仅当我在方法的开头添加 a 时才会注销Thread.sleep(1000)

测试类:

我不想使用善变Thread.sleep()的测试显然是在一些设置过程完成之前执行的。我显然需要等待某些事情,但我不知道该怎么做。

使用:

  • 爪哇 11
  • 春季启动 2.5.6
  • JUnit 5
  • spring-kafka-test 2.7.8
0 投票
1 回答
53 浏览

spring - 如何在同一测试规范上创建多个 EmbeddedKafka bean

如何创建多个 EmbeddedKafka,现在我在 EmbeddedKafka 上定义了两个主题。

在上面的块中,我将更改 replication.factor

0 投票
0 回答
17 浏览

apache-kafka - 使用带有 Schema Registry 的 Scala 对 EmbeddedKafka 进行单元测试

我需要在 Spark 应用程序中使用嵌入式 Kafka 对我们的 Kafka 应用程序进行单元测试。

EmbeddedKafka 有许多使用 java SpringBoot 或使用 Kafka Streams 的示例,但未能为使用 Scala 的 EmbeddedKafka 获得一些好的参考。

我尝试了下面的代码作为开始,但以我有限的知识无法进一步进行。

如果有人分享他们的知识或 Url 关于使用 Scala 和模式注册表使用嵌入式 kafka 的知识,这真的很有帮助吗?