0

我使用 Apache Kafka 作为消息处理器,并在 Asp.Net Core 中使用 Confluent.Kafka 作为消费者。我必须使用几个消费者来处理来自主题的所有消息。当我没有任何异常时,我的代码可以完美运行。2 个消费者并行工作。如果我在消息处理过程中出现异常,就会出现问题。有时(实际上并不经常)我有偏移量重复,这会导致重新处理已处理的消息。

这是代码,它模拟处理过程中抛出的随机异常。再有一次,如果我删除随机异常并启动 2 个消费者,一切正常。但除了我有偏移重复...

class Program
{
    private const string Topic = "some-topic";

    private static void Main()
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "host list here",
            GroupId = "some-group",
            EnableAutoCommit = false,
            AutoOffsetReset = AutoOffsetReset.Earliest
        };

        while (true)
        {
            try
            {
                using IConsumer<Ignore, string> consumer = new ConsumerBuilder<Ignore, string>(config).Build();

                consumer.Subscribe(Topic);

                while (true)
                {
                    try
                    {
                        const int batchSize = 5;

                        var readOnlyCollection = consumer.ConsumeBatch(batchSize);

                        bool error = new Random().Next(100) > 70;

                        foreach (ConsumeResult<Ignore, string> consumeResult in readOnlyCollection)
                        {
                            if (error)
                            {
                                Console.WriteLine($"Stop at Offset: {consumeResult.Offset.Value}");
                                throw new Exception("Something happened");
                            }

                            Console.WriteLine($"Offset: {consumeResult.Offset.Value}");
                        }

                        Thread.Sleep(500);

                        consumer.Commit();

                        Thread.Sleep(500);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"Inner exception: {ex.Message}");
                        consumer.Close();
                        throw;
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Outer exception: {ex.Message}");
            }
        }
    }
}

这是批处理的扩展:

static class ConsumerExtensions
{
    public static IReadOnlyCollection<ConsumeResult<TKey, TValue>> ConsumeBatch<TKey, TValue>(
        this IConsumer<TKey, TValue> consumer, int maxBatchSize, CancellationToken cancellationToken = default)
    {
        var message = consumer.Consume(cancellationToken);

        if (message?.Message is null)
            return Array.Empty<ConsumeResult<TKey, TValue>>();

        var messageBatch = new List<ConsumeResult<TKey, TValue>> { message };

        while (messageBatch.Count < maxBatchSize)
        {
            message = consumer.Consume(TimeSpan.Zero);
            if (message?.Message is null)
                break;

            messageBatch.Add(message);
        }

        return messageBatch;
    }
}

我应该怎么办?

4

0 回答 0