1

问题

我的应用程序通过掩码从多个主题中读取消息,并因错误而崩溃:

Confluent.Kafka.ConsumeException: Broker: Message size too large

我试过的

第一个问题是我遵循了处理大消息的准则:fetch.max.bytesmessage.max.bytes并且receive.message.max.bytes配置设置为限制:

ConsumerSettings = new KafkaConsumerSettings(...)
    .With(x => x.ConsumerConfig.FetchMaxBytes   = 2_000_000_000)
    .With(x => x.ConsumerConfig.MessageMaxBytes = 1_000_000_000)
    .With(x => x.ConsumerConfig.ReceiveMessageMaxBytes = 2_000_000_000 + 512);
// `fetch.max.bytes` must be >= `message.max.bytes`
// `message.max.bytes` must be in range 1000..1000000000
// `receive.message.max.bytes` must be >= `fetch.max.bytes` + 512

但是应用程序仍然无法处理该消息。所以我决定跳过这条消息

第二个问题是我找不到导致错误的消息。我已确定已停止处理的主题(CURRENT-OFFSET未更改LAG且不等于 0)。我检查了这些CURRENT-OFFSET消息 - 它们都很小:

$ kafka-consumer-groups.bat --bootstrap-server ***:9092  --group "Khajiit.GlobalCatalog.KpcCategoryCalculator.V1" --describe

at 2022.02.18:
TOPIC                                                             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG      
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    0          34254397        34876289        621892   
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    1          34713480        34713480        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    2          34717961        34717961        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    0          22851397        70600273        47748876 
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    1          5062234         69250288        64188054 
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    2          1538710         70360554        68821844 
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3    0          58093545        60426064        2332519  
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3    2          58127630        60451357        2323727   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 1          445647          192860258       192414611
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 2          236             192582179       192581943 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3     0          21818841        23028014        1209173   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3     2          23055736        23055736        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     0          49773684        50826978        1053294   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     1          49801454        50854337        1052883   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3  1          3089137         10822241        7733104   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3  2          6479987         10830654        4350667  

at 2022.02.21
TOPIC                                                             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    0          34254740        34876289        621549    (34254740 - 34254397 = 343      messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    1          34713480        34713480        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    2          34717961        34717961        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    0          54218443        70676247        16457804  (54218443 - 22851397 = 31367046 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    1          16219754        69328536        53108782  (16219754 - 5062234  = 11157520 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    2          1616692         70441024        68824332  (1616692  - 1538710  = 77982    messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3    0          58096101        60426064        2329963   (58096101 - 58093545 = 2556     messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3    2          58127630        60451357        2323727   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 1          2005847         193217223       191211376 (2005847  - 445647   = 1560200  messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 2          374             192938130       192937756 (374      - 236      = 138      messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3     0          21818841        23028014        1209173   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3     2          23055736        23055736        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     0          49773684        50826978        1053294   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     1          49801454        50854337        1052883   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3  1          3089137         10878226        7789089   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3  2          6821193         10886253        4065060   (6821193  - 6479987  = 341206   messages processed)

current offset and the next two message sizes:
TOPIC                                                             PARTITION  CURRENT-OFFSET
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3    2          58127630    (530, 530, 708 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3     0          21818841    (545, 545, 545 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     0          49773684    (690, 690, 693 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     1          49801454    (521, 521, 699 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3  1          3089137     (557, 557, 557 bytes)

问题

如何识别导致错误的消息?

我猜一个解决方案将使用命令行实用程序输出具有分区/偏移量的主题的最大消息大小

4

0 回答 0