问题
我的应用程序通过掩码从多个主题中读取消息,并因错误而崩溃:
Confluent.Kafka.ConsumeException: Broker: Message size too large
我试过的
第一个问题是我遵循了处理大消息的准则:fetch.max.bytes
,message.max.bytes
并且receive.message.max.bytes
配置设置为限制:
ConsumerSettings = new KafkaConsumerSettings(...)
.With(x => x.ConsumerConfig.FetchMaxBytes = 2_000_000_000)
.With(x => x.ConsumerConfig.MessageMaxBytes = 1_000_000_000)
.With(x => x.ConsumerConfig.ReceiveMessageMaxBytes = 2_000_000_000 + 512);
// `fetch.max.bytes` must be >= `message.max.bytes`
// `message.max.bytes` must be in range 1000..1000000000
// `receive.message.max.bytes` must be >= `fetch.max.bytes` + 512
但是应用程序仍然无法处理该消息。所以我决定跳过这条消息
第二个问题是我找不到导致错误的消息。我已确定已停止处理的主题(CURRENT-OFFSET
未更改LAG
且不等于 0)。我检查了这些CURRENT-OFFSET
消息 - 它们都很小:
$ kafka-consumer-groups.bat --bootstrap-server ***:9092 --group "Khajiit.GlobalCatalog.KpcCategoryCalculator.V1" --describe
at 2022.02.18:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3 0 34254397 34876289 621892
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3 1 34713480 34713480 0
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3 2 34717961 34717961 0
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3 0 22851397 70600273 47748876
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3 1 5062234 69250288 64188054
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3 2 1538710 70360554 68821844
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3 0 58093545 60426064 2332519
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3 2 58127630 60451357 2323727 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 1 445647 192860258 192414611
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 2 236 192582179 192581943 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3 0 21818841 23028014 1209173 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3 2 23055736 23055736 0
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3 0 49773684 50826978 1053294 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3 1 49801454 50854337 1052883 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3 1 3089137 10822241 7733104 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3 2 6479987 10830654 4350667
at 2022.02.21
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3 0 34254740 34876289 621549 (34254740 - 34254397 = 343 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3 1 34713480 34713480 0
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3 2 34717961 34717961 0
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3 0 54218443 70676247 16457804 (54218443 - 22851397 = 31367046 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3 1 16219754 69328536 53108782 (16219754 - 5062234 = 11157520 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3 2 1616692 70441024 68824332 (1616692 - 1538710 = 77982 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3 0 58096101 60426064 2329963 (58096101 - 58093545 = 2556 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3 2 58127630 60451357 2323727 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 1 2005847 193217223 191211376 (2005847 - 445647 = 1560200 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 2 374 192938130 192937756 (374 - 236 = 138 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3 0 21818841 23028014 1209173 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3 2 23055736 23055736 0
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3 0 49773684 50826978 1053294 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3 1 49801454 50854337 1052883 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3 1 3089137 10878226 7789089 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3 2 6821193 10886253 4065060 (6821193 - 6479987 = 341206 messages processed)
current offset and the next two message sizes:
TOPIC PARTITION CURRENT-OFFSET
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3 2 58127630 (530, 530, 708 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3 0 21818841 (545, 545, 545 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3 0 49773684 (690, 690, 693 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3 1 49801454 (521, 521, 699 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3 1 3089137 (557, 557, 557 bytes)
问题
如何识别导致错误的消息?
我猜一个解决方案将使用命令行实用程序输出具有分区/偏移量的主题的最大消息大小