测试结果图 当我使用confluent.kafka-net时,我想保证kafka消息在.netcore中的顺序,我创建了一个mschannel,
_msChannel = Channel.CreateUnbounded();
收听时,我得到了消费结果和收到的消息,并且
if (_msChannel.Writer.TryWrite(receivedMessage))
{
_consumer.Pause(new[] { consumeResult.TopicPartition });
// _consumer.StoreOffset(consumeResult);
// Console.WriteLine("Pause:" + sw1.ElapsedTicks + " ,Partition:" + consumeResult.TopicPartition.Partition);
}
当消息没有错误时,我使用message.Ack()
如下:
public ValueTask Ack()
{
try
{
_consumer.StoreOffset(_offset);
}
finally
{
var sw = Stopwatch.StartNew();
try
{
_consumer.Resume(new[] { _offset.TopicPartition });
}
catch (Exception e)
{
}
sw.Stop();
Console.WriteLine("Resume:" + sw.ElapsedTicks + " ticks" + " ,Partition:" + _offset.TopicPartition.Partition);
}
return default;
}
结果是kafka的consumer很慢,我发现原因是partition的consumer和resume,耗费了太多的时间,但是我没有更好的办法来处理这种情况。
在我的服务器测试中,消费者每分钟消耗约 300 条记录。