我正在使用 Kafka 0.8.2.1,运行一个具有 200 个分区和 RF=3 的主题,日志保留设置为大约 1GB。
未知事件导致集群进入“协调器负载”或“组负载”状态。一些信号表明了这一点:基于 pykafka 的消费者在 s 期间开始失败,对于某些分区子集,FetchOffsetRequest
错误代码为 14 。COORDINATOR_LOAD_IN_PROGRESS
这些错误是在使用自协调器加载之前就存在的消费者组消费时触发的。在代理日志中,出现了这样的消息:
[2018-05...] ERROR Controller 17 epoch 20 initiated state change for partition [my.cool.topic,144] from OnlinePartition to OnlinePartition failed (state.change.logger)
kafka.common.StateChangeFailedException: encountered error while electing leader for partition [my.cool.topic,144] due to: Preferred replica 11 for partition [my.cool.topic,144] is either not alive or not in the isr. Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}].
出于某种原因,Kafka 决定副本 11 是“首选”副本,尽管它不在 ISR 中。据我所知,当 11 重新同步时,消费可以从副本 12 或 13 不间断地继续 - 目前尚不清楚为什么 Kafka 选择非同步副本作为首选领导者。
上述行为持续了大约 6 个小时,在此期间 pykafka fetch_offsets 错误使消息无法消费。虽然协调器负载仍在进行中,但其他消费者组能够毫无错误地使用该主题。事实上,最终的解决方法是使用新的 consumer_group 名称重新启动损坏的消费者。
问题
- 协调器负载状态持续 6 小时是正常的还是预期的?此加载时间是否受日志保留设置、消息生成率或其他参数的影响?
- 非 pykafka 客户端是否
COORDINATOR_LOAD_IN_PROGRESS
仅通过使用非错误分区来处理?Pykafka 坚持所有分区都返回成功OffsetFetchResponse
的 s 可能是消耗停机时间的来源。 - 为什么 Kafka 在协调器加载期间有时会选择非同步副本作为首选副本?如何将分区领导者重新分配给 ISR 中的副本?
- 所有这些问题都没有意义,因为我应该只使用更新版本的 Kafka 吗?
代理配置选项:
broker.id=10
port=9092
zookeeper.connect=****/kafka5
log.dirs=*****
delete.topic.enable=true
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
controller.socket.timeout.ms=30000
message.max.bytes=1000000
auto.create.topics.enable=false
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=96
log.roll.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
num.io.threads=8
socket.request.max.bytes=104857600
num.replica.fetchers=4
controller.message.queue.size=10
num.partitions=8
log.flush.interval.ms=60000
log.flush.interval.messages=60000
log.flush.scheduler.interval.ms=2000
num.network.threads=8
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=500
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
controlled.shutdown.enable=true