3

我有三个 MSK 集群;开发、非产品和产品。它们都有以下集群配置——没有主题级别的配置。

auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
log.retention.hours=100
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000

Dev 和 Nonprod 正在清除设置中定义的超过 100 小时的消息log.retention.hours=100

我们有更多的流量通过我们的生产集群,旧消息没有被删除。集群上仍有数十万条超过 400 小时的消息。我考虑过添加更多的配置设置,例如

segment.bytes
segment.ms

为了更快地滚动段,因为段可能还没有滚动并且不能标记为删除 - 但是这个相同的配置在其他集群中运行良好,尽管没有接收到那么多的流量。

4

1 回答 1

2

所以这被证明是生产者以美国日期格式而不是英国日期格式向 Kafka 发送消息的问题。因此,它创建的消息似乎会在未来加盖时间戳 - 因此不超过 100 小时且符合删除条件。

为了删除现有消息,我们设置了与设置log.retention.bytes无关的修剪消息log.retention.hours。这导致 kafka 主题被修剪并删除错误消息 - 然后我们取消设置log.retention.bytes

接下来,我们设置log.message.timestamp.type=LogAppendTime以确保消息被标记为与文档时间相对的队列时间。这将防止生产者的错误日期在将来再次导致此问题。

于 2021-03-08T15:48:28.447 回答