问题标签 [embedded-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
258 浏览

unit-testing - 使用 KafkaEmbedded 是否不能保证消息顺序?

我使用KafkaEmbedded(and KafkaTemplate) 进行了单元测试,但消息顺序是随机的。有谁知道这是否合乎逻辑,是否可能是担保令?

这是我的代码:

例如,此代码打印(但顺序可以更改):

0 投票
1 回答
150 浏览

spring-boot - 使用 EmbeddedKafka 时为每个 Spock 规范重新加载应用程序上下文

我有一个带有集成测试设置的 Spring Boot 应用程序。我有一个基本的 groovy 类,我在每个规范中都进行了扩展。Spring boot version 2.0.6 我的问题是每个规范都重新启动了应用程序上下文。

我将不胜感激这方面的任何帮助。谢谢。

人们在使用 DirtiesContext 或 MockBean 时似乎遇到了这个问题。我也没有。

这就是基地的样子

我希望我所有的测试都使用相同的上下文。

0 投票
0 回答
317 浏览

apache-kafka - 在不使用连接器的情况下将数据库数据流式传输到 Kafka 主题

我有一个用例,我必须将所有 MySQL 数据库数据推送到 Kafka 主题。现在,我知道我可以使用 Kafka 连接器启动并运行它,但我想了解它是如何在不使用连接器的情况下在内部工作的。在我的 Spring Boot 项目中,我已经创建了一个 Kafka Producer 文件,我在其中设置了所有配置,创建了一个 Producer 记录等等。

有没有人尝试过这种方法?任何人都可以对此有所了解吗?

0 投票
2 回答
1094 浏览

java - Spring-cloud-stream MessageConversionException

我有一个接受@Payload 字符串的@StreamListener。为了测试这个 Listener 类,我使用嵌入式 Kafka 编写了一个 Junit 类。运行测试类时出现错误

错误

错误 osihandler.LoggingHandler - org.springframework.messaging.converter.MessageConversionException:GenericMessage 无法从 [[B] 转换为 [java.lang.String]


如果我将 @Payload 的 dataType 从 String 更改为 byte[] 消息将被我的侦听器类选中。

有人可以帮我知道这里的问题吗?我想这与云流配置有关。



这是我的 application.yaml 的样子。

0 投票
0 回答
832 浏览

java - EmbeddedKafka/Zookeeper 由于 ZkInterruptedException 无法启动:java.lang.InterruptedException

我正在尝试将 EmbeddedKafka ( https://github.com/spring-projects/spring-kafka/blob/master/src/reference/asciidoc/testing.adoc ) 与我的单元测试集成。

不总是,但我经常在 EmbeddedKafka 启动期间遇到错误。

pom.xml:

KafkaTopicUtilsTest.java 服务器初始化通过@Rule:

如前所述,当我在 InteliJ 中运行测试时,它几乎总是运行良好。

从 INteliJ ( Run 'KafkaTopicUtilsTest' ) 执行工作正常。

通过 maven mvn clean install执行测试失败。

显式测试执行mvn -Dtest=KafkaTopicUtilsTest 测试工作正常。

有人遇到过这样的问题吗?任何线索可能是什么问题?

问题已解决

该问题与其他测试用例有关。另一个测试(不使用 EmbeddedKafka)是抛出 InterrupedException 并检查代码是否正确响应。通过调用Thread.currentThread().interrupt()来保持中断状态。看起来虚拟机保持中断状态,EmbeddedKafka 对其做出反应。

0 投票
1 回答
2458 浏览

spring-cloud-stream - 在带有自定义通道绑定的春季云流测试中使用嵌入式 Kafka

我有一个 Spring Boot 应用程序,我在其中使用 spring-cloud-stream 从 kafka 主题中消费,进行一些处理并发布到另一个 kafka 主题。该应用程序运行良好,并且我编写了运行良好的单元测试(使用 TestBinder)。

我现在正在尝试使用嵌入式 Kafka 编写集成测试并测试端到端功能。我已经按照这里的示例https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/testing-samples/test-embedded-kafka/src/test/java/demo/EmbeddedKafkaApplicationTests。 java来编写测试,但这不起作用 - 我无法收到有关输出主题的任何消息。

应用程序.yml

TransformerBinding.java

事件处理器.java

EventProcessorTest.java

上面的测试失败了,因为我期望存在 1 条记录,但我在输出主题中得到 0 条记录。

0 投票
1 回答
2274 浏览

spring-boot - 无法为 EmbeddedKafkaBroker 创建或验证数据目录

我正在尝试使用@EmbeddedKafka Annotation 运行一个简单的单元测试。作为参考,我正在关注以下春季文档 https://docs.spring.io/spring-kafka/reference/html/#embedded-kafka-annotation

我试图定义 log.dir @EmbeddedKafka(brokerProperties = "log.dir=") 因为我在运行测试时遇到错误。

我试过了 :

  • log.dir=/kafka-logs
  • log.dir=real_path_to_my_project/kafka-logs
  • ...

但是每次我运行测试时都会出现这个错误:

0 投票
1 回答
288 浏览

scala - 如何增加嵌入式 kafka 的 RAM?

我使用Embedded-kafka,但我有一些稳定工作的问题,反映在: Connection to node 0 (localhost/127.0.0.1:6001) could not be established. Broker may not be available.和超时。

我认为增加内存会有所帮助,但我找不到任何属性。我试图搜索代码memoryRAM单词,但没有运气。是否可以为嵌入式 kafka 增加 RAM?

0 投票
2 回答
1442 浏览

spring-boot - Spring Boot 找不到 EmbeddedKafkaBroker Bean(不使用 SpringbootTest)

我想将嵌入式 Kafka 用于 Spring Boot 应用程序。我可以使用嵌入式 Kafka 进行 Junit 测试,但是在尝试在主应用程序中使用时,嵌入式 Kafka 对象没有被识别。

尝试加载 Spring Boot 应用程序时,嵌入式 kafka 对象未自动装配。这适用于非测试流程。

....... ........ }

com.dell.pde.kafka.KafkaConsumerTestBase 中的字段 embeddedKafka 需要找不到类型为“org.springframework.kafka.test.EmbeddedKafkaBroker”的 bean。

注入点有以下注解: - @org.springframework.beans.factory.annotation.Autowired(required=true)

0 投票
2 回答
1213 浏览

scala - KafkaEmbedded 抛出 NoSuchMethodError:org.apache.kafka.common.config.ConfigDef$ValidString

我正在尝试使用嵌入式 kafka 编写功能测试。但在启动集群时出现以下错误:

异常或错误导致运行中止:org.apache.kafka.common.config.ConfigDef$ValidString.(Ljava/util/List;Lorg/apache/kafka/common/config/ConfigDef$1;)V java.lang。 NoSuchMethodError:org.apache.kafka.common.config.ConfigDef$ValidString.(Ljava/util/List;Lorg/apache/kafka/common/config/ConfigDef$1;)V at org.apache.kafka.common.config.ConfigDef $ValidList.(ConfigDef.java:895)`

我的 Pom.xml 有这个依赖