0

我写了一个测试类,我将在其中测试主题删除

我找到了EmbeddedKafkaBroker这个方法

embeddedKafkaBroker.addTopics("new-topic");

这使我能够创建一个新主题,但我找不到任何可以删除主题的东西。

那么,如何测试主题删除?

4

2 回答 2

1

看到这个:

/**
 * Create an {@link AdminClient}; invoke the callback and reliably close the admin.
 * @param callback the callback.
 */
public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {

您可以在哪里使用:

/**
 * This is a convenience method for {@link #deleteTopics(TopicCollection, DeleteTopicsOptions)}
 * with default options. See the overload for more details.
 * <p>
 * This operation is supported by brokers with version 0.10.1.0 or higher.
 *
 * @param topics The topic names to delete.
 * @return The DeleteTopicsResult.
 */
default DeleteTopicsResult deleteTopics(Collection<String> topics) {
于 2022-02-28T20:33:52.180 回答
0

以 Artem 的回答为基础,由于所涉及事件的异步性,这可能有点棘手。

如果主题在 Kafka 代理中不存在,则 in 中的describeTopics方法KafkaAdminClient会抛出一个UnknownTopicOrPartitionException包装在 an 中的ExecutionException内容,因此我们可以利用它来检查主题的存在并等待其创建或删除。

编辑:虽然上述策略确实有效,但正确的方法是等待未来完成,例如admin.deleteTopics(List.of(topicName)).all().get()

但是由于嵌入式代理不知道主题已被删除,因此embeddedKafkaBroker.getTopics()结果不会更新,因此为了确定主题是否真的已被删除,我们需要使用该 admin.describeTopics(List.of(topicName)).all().get()方法来代替。

于 2022-02-28T21:44:27.380 回答