0

.net kafka 中没有 Poll 方法。我想轮询 50 条消息并通过 10 个线程并行处理。但它不支持 .

这是我找到的解决方案。

public static IReadOnlyCollection<ConsumeResult<TKey, TValue>> ConsumeBatch<TKey, TValue>(this IConsumer<TKey, TValue> consumer,CancellationToken cancellationToken,  int maxBatchSize)
            {
                var message = consumer.Consume(cancellationToken);

                if (message?.Message is null)
                    return Array.Empty<ConsumeResult<TKey, TValue>>();

                var messageBatch = new List<ConsumeResult<TKey, TValue>> { message };

                while (messageBatch.Count < maxBatchSize)
                {
                    message = consumer.Consume(TimeSpan.Zero);
                    if (message?.Message is null)
                        break;

                    messageBatch.Add(message);
                }

                return messageBatch;
            }

现在我可以收到 100 条消息。但是如何处理提交?如果我错了,请纠正我。

例如;

分区中有 300 条消息。

我将自动提交设置为 true,间隔为 5 秒;

它检查每个调用的 Consume 方法 5 秒。

第一次消费开始计时器。我在 while 循环中收到 99 条消息。这意味着我调用了 100 次消费方法。每个消耗方法检查时间为 5 秒。当我收到最后一条消息时。我花了500毫秒。还有 4.5 秒的时间来提交。

我花了 8 秒处理 100 条消息。但我还没有调用下一个消费方法。并且偏移量没有提交。当我下次消费时,消费者将承诺。偏移量将是 100。因为提交的时间到了。

当我再次消费时。计时器重新启动 5 秒。

那正确吗 ?

或者提交独立于消费方法。后台任务每 5 秒提交一次

4

0 回答 0