我写了一个测试类,我将在其中测试主题删除
我找到了EmbeddedKafkaBroker
这个方法
embeddedKafkaBroker.addTopics("new-topic");
这使我能够创建一个新主题,但我找不到任何可以删除主题的东西。
那么,如何测试主题删除?
我写了一个测试类,我将在其中测试主题删除
我找到了EmbeddedKafkaBroker
这个方法
embeddedKafkaBroker.addTopics("new-topic");
这使我能够创建一个新主题,但我找不到任何可以删除主题的东西。
那么,如何测试主题删除?
看到这个:
/**
* Create an {@link AdminClient}; invoke the callback and reliably close the admin.
* @param callback the callback.
*/
public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
您可以在哪里使用:
/**
* This is a convenience method for {@link #deleteTopics(TopicCollection, DeleteTopicsOptions)}
* with default options. See the overload for more details.
* <p>
* This operation is supported by brokers with version 0.10.1.0 or higher.
*
* @param topics The topic names to delete.
* @return The DeleteTopicsResult.
*/
default DeleteTopicsResult deleteTopics(Collection<String> topics) {
以 Artem 的回答为基础,由于所涉及事件的异步性,这可能有点棘手。
如果主题在 Kafka 代理中不存在,则 in 中的describeTopics
方法KafkaAdminClient
会抛出一个UnknownTopicOrPartitionException
包装在 an 中的ExecutionException
内容,因此我们可以利用它来检查主题的存在并等待其创建或删除。
编辑:虽然上述策略确实有效,但正确的方法是等待未来完成,例如admin.deleteTopics(List.of(topicName)).all().get()
但是由于嵌入式代理不知道主题已被删除,因此embeddedKafkaBroker.getTopics()
结果不会更新,因此为了确定主题是否真的已被删除,我们需要使用该 admin.describeTopics(List.of(topicName)).all().get()
方法来代替。