15

我们启动一个 Kafka 消费者,监听一个可能尚未创建的主题(虽然启用了主题自动创建)。

不久之后,生产者发布了关于该主题的消息。

但是,消费者需要一些时间才能注意到这一点:准确地说是 5 分钟。此时消费者撤销其分区并重新加入消费者组。卡夫卡重新稳定了团队。查看消费者与 kafka 日志的时间戳,这个过程是在消费者端启动的。

我想这是预期的行为,但我想了解这一点。这实际上是在进行重新平衡(从 0 分区到 1 分区)吗?如果我们提前创建主题,这不会发生吗?

2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:36:45.693  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning
2017-02-01 08:36:45.738  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group tps-kafka-partitioning with generation 1
2017-02-01 08:36:45.747  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.749  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2017-02-01 08:41:45.540  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:41:45.544  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:41:45.544  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning

卡夫卡日志

[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-partitioning with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-partitioning generation 2 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:41:45,551] INFO [GroupCoordinator 1001]: Assignment received from leader for group tps-kafka-partitioning for generation 2 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-group-id with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-group-id generation 2 (kafka.coordinator.GroupCoordinator)
4

1 回答 1

12

这可能是由于参数metadata.max.age.ms的默认值控制了消费者强制刷新主题元数据的频率。

当您使用不存在的主题启动消费者时会发生什么是代理自动创建该主题,但这需要一些时间来进行领导者选举等,因此当您的消费者请求该主题的元数据时,它会收到 LEADER_NOT_AVAILABLE 警告和无法获取任何消息。达到上述超时后,消费者刷新元数据,这次成功并开始读取消息。这不依赖于生产者向主题写入消息,它纯粹是消费者的事情。

如果您使用例如 1000 毫秒超时启动您的消费者,您应该会看到更短的延迟,直到消息被消费。

此外,如果您预先创建主题,或者在消费者之前启动生产者,则根本不应该发生这种行为。

于 2017-02-01T13:08:39.290 回答