如果这是一个重复的帖子,我很抱歉,但我一直在寻找答案,但到目前为止一无所获。
我需要的是在更改 Kafka Streams 中的密钥类型的映射操作期间指定 serdes。原始的 KStream 具有字符串类型的键和 avro (GenericRecord) 值,但我需要将其重新映射到 avro 键和值。这些方面的东西:
KStream<String, GenericRecord> inputStream = builder.stream("someTopic");
KStream<GenericRecord, GenericRecord> rekeyedStream = inputStream.map((key, value) -> {
GenericRecord newKey = new GenericData.Record(someSchema);
...
return new KeyValue(newKey, newValue);
});
我相信我需要指定 serde,因为类型正在更改,但我发现无法在地图运算符上执行此操作。从主题读取、分组或回写主题时,我们通常可以执行以下操作来覆盖默认的 serdes:
KStream<GenericRecord, GenericRecord> stream = builder.stream("someTopic",
Consumed.with(keySerde, valueSerde));
KGroupedStream<GenericRecord, GenericRecord> groupedStream =
inputStream.groupBy((key, value)->somethingThatChangesTheKey(),
Grouped.with(newKeySerde, newValueSerde));
inputStream.to("someTopic", Produced.with(keySerde,valueSerde));
然而,当类型发生变化时,我完全不知道如何在地图中指定 serdes,在这种特殊情况下,我不能使用我的应用程序的默认 serdes。
我最接近找到解决方案的是这里的这篇文章,但恐怕接受的回复告诉 OP 他需要指定 serdes,但不是在地图期间如何做到这一点(至少据我了解,我可能弄错了)。
任何见解将不胜感激。