我创建了一个 kafka 消费者,它在单个分区主题的批处理上执行。每隔 15 分钟,我的批处理将执行并使用已发布到主题的所有新消息。在当时消费完所有可用消息后,批处理将退出。
以下是我如何实现此行为的示例代码:
CancellationTokenSource cts = new CancellationTokenSource();
bool isPartitionEof = false;
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
consumer.Subscribe("topicName");
while (!isPartitionEof)
{
ConsumeResult<Ignore, string> consumeResult = consumer.Consume(cts);
isPartitionEof = consumeResult.IsPartitionEof;
if (consumeResult.Message != null)
{
// consume logic
}
}
}
一旦主题转到多分区,上述逻辑将不再起作用,因为到达一个分区的末尾将提前退出批处理应用程序。有没有一种方法可以遍历主题上的分区并在分区的基础上使用消息?
我在网上找到的大多数示例都是面向服务的 Kafka 消费者,当消息发布到主题时,它们会无限轮询和消费。不幸的是,我的情况有点独特,这要求我有一个明确的条件,以便在所有消息都被消耗完并退出批处理应用程序后停止读取新消息。任何帮助将不胜感激!