我使用 Apache Kafka 作为消息处理器,并在 Asp.Net Core 中使用 Confluent.Kafka 作为消费者。我必须使用几个消费者来处理来自主题的所有消息。当我没有任何异常时,我的代码可以完美运行。2 个消费者并行工作。如果我在消息处理过程中出现异常,就会出现问题。有时(实际上并不经常)我有偏移量重复,这会导致重新处理已处理的消息。
这是代码,它模拟处理过程中抛出的随机异常。再有一次,如果我删除随机异常并启动 2 个消费者,一切正常。但除了我有偏移重复...
class Program
{
private const string Topic = "some-topic";
private static void Main()
{
var config = new ConsumerConfig
{
BootstrapServers = "host list here",
GroupId = "some-group",
EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetReset.Earliest
};
while (true)
{
try
{
using IConsumer<Ignore, string> consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe(Topic);
while (true)
{
try
{
const int batchSize = 5;
var readOnlyCollection = consumer.ConsumeBatch(batchSize);
bool error = new Random().Next(100) > 70;
foreach (ConsumeResult<Ignore, string> consumeResult in readOnlyCollection)
{
if (error)
{
Console.WriteLine($"Stop at Offset: {consumeResult.Offset.Value}");
throw new Exception("Something happened");
}
Console.WriteLine($"Offset: {consumeResult.Offset.Value}");
}
Thread.Sleep(500);
consumer.Commit();
Thread.Sleep(500);
}
catch (Exception ex)
{
Console.WriteLine($"Inner exception: {ex.Message}");
consumer.Close();
throw;
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Outer exception: {ex.Message}");
}
}
}
}
这是批处理的扩展:
static class ConsumerExtensions
{
public static IReadOnlyCollection<ConsumeResult<TKey, TValue>> ConsumeBatch<TKey, TValue>(
this IConsumer<TKey, TValue> consumer, int maxBatchSize, CancellationToken cancellationToken = default)
{
var message = consumer.Consume(cancellationToken);
if (message?.Message is null)
return Array.Empty<ConsumeResult<TKey, TValue>>();
var messageBatch = new List<ConsumeResult<TKey, TValue>> { message };
while (messageBatch.Count < maxBatchSize)
{
message = consumer.Consume(TimeSpan.Zero);
if (message?.Message is null)
break;
messageBatch.Add(message);
}
return messageBatch;
}
}
我应该怎么办?