1

我使用融合的 .net 客户端。订阅者总是在重启后(订阅者服务重启)读取来自 Kafka 主题的所有消息。如何提交消费者已经实现并从中读取的偏移量?也许一些消费者配置可以帮助......

4

2 回答 2

3

这只是一个疯狂的猜测,但是如何声明消费者的组 id?我见过一些使用这种随机分配的例子:

     ["group.id"] = Guid.NewGuid().ToString(),

如果您在group.id每次启动消费者时声明一个新的/随机的,这将导致在每次执行时注册一个新的消费者组,这涉及到启动auto.offset.reset

如果这个属性设置为“ earliest”,那么每次你启动你的消费者(假设他们每次都有不同的 group.id),他们将从第一个可用的偏移量开始,就像你的情况一样,再次从开始。

如果此属性设置为“ latest”,并且您的生产者当前没有发送任何消息,您将无法读取任何内容,这可能会造成一些混乱。

尝试设置一个固定的group.id:开始消费,当消息在代理上仍然可用时停止进程,然后再次启动消费者,而不更改最后一个group.id

这一次,由于消费者组已经注册,auto.offset.reset将被忽略,起始位置将由您提交的偏移量定义,默认情况下存储在名为__consumer_offsets.

于 2020-12-07T07:57:47.190 回答
2

你有两个选择:

  • 通过设置消费者属性启用自动提交EnableAutoCommit = true(消息在可配置的时间后提交,通常为 5 秒),或者

  • 手动提交获取的偏移量consumer.Commit(consumeResult)

手动提交的示例显示在GitHub 上

于 2020-12-06T19:54:46.010 回答