0

测试结果图 当我使用confluent.kafka-net时,我想保证kafka消息在.netcore中的顺序,我创建了一个mschannel,

_msChannel = Channel.CreateUnbounded();

收听时,我得到了消费结果和收到的消息,并且

if (_msChannel.Writer.TryWrite(receivedMessage))
{
    _consumer.Pause(new[] { consumeResult.TopicPartition });
    // _consumer.StoreOffset(consumeResult);

    // Console.WriteLine("Pause:" + sw1.ElapsedTicks + " ,Partition:" + consumeResult.TopicPartition.Partition);
}

当消息没有错误时,我使用message.Ack()如下:

public ValueTask Ack()
{
    try
    {
        _consumer.StoreOffset(_offset);
    }
    finally
    {
        var sw = Stopwatch.StartNew();
        try
        {
            _consumer.Resume(new[] { _offset.TopicPartition });
        }
        catch (Exception e)
        {

        }
        sw.Stop();

        Console.WriteLine("Resume:" + sw.ElapsedTicks + " ticks" + " ,Partition:" + _offset.TopicPartition.Partition);
    }

    return default;
}

结果是kafka的consumer很慢,我发现原因是partition的consumer和resume,耗费了太多的时间,但是我没有更好的办法来处理这种情况。

在我的服务器测试中,消费者每分钟消耗约 300 条记录。

4

0 回答 0