0

我正在使用Confluent Kafka .NET为分区主题创建消费者。

由于 Confluent Kafka .NET 不支持批量消费,我构建了一个函数来消费消息,直到达到批量大小。此函数的想法是仅使用来自同一分区的消息构建批处理,这就是为什么一旦我使用具有不同分区的结果并返回到目前为止我能够使用的任何数量的消息时我停止构建批处理的原因.

目标或目的:我希望能够处理我在批处理中返回的消息,并仅提交这些消息的偏移量。IE:

从分区消费的消息 抵消 批量存储
0 0 是的
0 1 是的
2 0

从上表中,我想处理我从分区 0 获得的两条消息。来自分区 2 的消息将被忽略,并且(希望)稍后在对ConsumeBatch的另一个调用中拾取。

要提交,我只需调用同步Commit函数,将我处理的最新消息的偏移量作为参数传递。在这种情况下,我将传递上表中显示的批次的第二条消息的偏移量(分区 0 - 偏移量 1)。

问题:

问题是,由于某种原因,当我像上面显示的那样构建一个批处理时,由于验证而决定不处理的消息将被永远忽略。即:分区 2 的消息 0 将永远不会被消费者再次拾取。

正如您在下面的消费者配置中看到的那样,我将EnableAutoCommitEnableAutoOffsetStore都设置为 false。我认为这足以让消费者不对偏移量做任何事情,并且能够在另一个Consume调用中接收被忽略的消息,但事实并非如此。无论我的配置如何,偏移量都会以某种方式增加到每个分区的最新消费消息。

如果可能的话,任何人都可以告诉我我在这里缺少什么以实现所需的行为吗?

构建批处理的函数的简化版本:

public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
    List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();

    int latestPartition = -1; // The partition from where we consumed the last message

    for (int i = 0; i < batchSize; i++)
    {
        var result = _consumer.Consume(100);
        
        if (result != null)
        {
            if (latestPartition == -1 || result.Partition.Value == latestPartition)
            {
                consumedMessages.Add(result);
                latestPartition = result.Partition.Value;
            }
            else
                break;
        }
        else
            break;
    }

    return consumedMessages;
}

ConsumerConfig 用于实例化我的消费者客户端:

_consumerConfig = new ConsumerConfig
        {
            BootstrapServers = _bootstrapServers,
            EnableAutoCommit = false,
            AutoCommitIntervalMs = 0,
            GroupId = "WorkerConsumers",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoOffsetStore = false,
        };

附加信息: 正在测试:

  • 1 个主题,6 个分区,复制因子为 2
  • 3个经纪人
  • 1 个属于一个消费者组的单线程消费者客户端
  • Windows 10 上带有 wsl2 的本地环境
4

1 回答 1

1

关键是使用该Seek函数将分区的偏移量重置为特定位置,以便可以将忽略的消息作为另一批次的一部分再次拾取。

在上面的相同函数中:

public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
    List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();

    int latestPartition = -1; // The partition from where we consumed the last message

    for (int i = 0; i < batchSize; i++)
    {
        var result = _consumer.Consume(100);
    
        if (result != null)
        {
            if (latestPartition == -1 || result.Partition.Value == latestPartition)
            {
                consumedMessages.Add(result);
                latestPartition = result.Partition.Value;
            }
            else
            {
                // This call will guarantee that this message that will not be included in the current batch, will be included in another batch later
                _consumer.Seek(result.TopicPartitionOffset); // IMPORTANT LINE!!!!!!!
                break;
            }
        }
        else
            break;
    }

    return consumedMessages;
}

我认为一般来说,如果您想在不以任何方式更改偏移量的情况下使用消息(有点偷看主题分区),您可以调用Consume然后使用Seek(result.TopicPartitionOffset)将该主题分区的偏移量设置回使用消息之前的位置.

于 2021-09-11T00:11:44.547 回答