0

如果这是一个重复的帖子,我很抱歉,但我一直在寻找答案,但到目前为止一无所获。

我需要的是在更改 Kafka Streams 中的密钥类型的映射操作期间指定 serdes。原始的 KStream 具有字符串类型的键和 avro (GenericRecord) 值,但我需要将其重新映射到 avro 键和值。这些方面的东西:

KStream<String, GenericRecord> inputStream = builder.stream("someTopic");
KStream<GenericRecord, GenericRecord> rekeyedStream = inputStream.map((key, value) -> {
   GenericRecord newKey = new GenericData.Record(someSchema);
   ...
   return new KeyValue(newKey, newValue);
});

我相信我需要指定 serde,因为类型正在更改,但我发现无法在地图运算符上执行此操作。从主题读取、分组或回写主题时,我们通常可以执行以下操作来覆盖默认的 serdes:

KStream<GenericRecord, GenericRecord> stream = builder.stream("someTopic", 
    Consumed.with(keySerde, valueSerde));

KGroupedStream<GenericRecord, GenericRecord> groupedStream = 
    inputStream.groupBy((key, value)->somethingThatChangesTheKey(), 
    Grouped.with(newKeySerde, newValueSerde));

inputStream.to("someTopic", Produced.with(keySerde,valueSerde));

然而,当类型发生变化时,我完全不知道如何在地图中指定 serdes,在这种特殊情况下,我不能使用我的应用程序的默认 serdes。

我最接近找到解决方案的是这里的这篇文章,但恐怕接受的回复告诉 OP 他需要指定 serdes,但不是在地图期间如何做到这一点(至少据我了解,我可能弄错了)。

任何见解将不胜感激。

4

1 回答 1

1

您不能指定新的 serde,map()因为map()不需要 serde。运算符本身获取一个map()输入对象并产生一个输出对象,但它从不序列化或反序列化任何消息。

只有读取或写入 Kafka 主题的操作员才允许您设置 serde,因为只有那些操作员才会使用 serde。

我不清楚你试图通过在map()操作员上设置一个 serde 来完成什么。你能详细说明吗?

于 2020-08-05T04:19:17.717 回答