我对 KafkaStreams 上的密钥反序列化有疑问。具体来说,我使用 Kafka Connect 和 debezium 连接器从 Postgres 表中读取数据。数据被导入到 Kafka 主题中,在 Kafka 模式注册表上创建了两个 Avro 模式,一个用于键,一个用于值(这包含表上的所有列)。
我在 GlobalKTable 上读取了这些数据,如下所示:
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
GlobalKTable<my.namespace.db.Key, my.namespace.db.Value> tableData = builder.globalTable("topic_name");
我的问题是我有一个拓扑,我需要将这个 GlobalKTable 与一个 KStream 连接起来,如下所示:
SpecificAvroSerde<EventObj> eventsSpecificAvroSerde = new SpecificAvroSerde<>();
eventsSpecificAvroSerde.configure(Collections.singletonMap(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
conf.getString(" kafka.schema.registry.url")), false);
KStream<Integer, EventObj> events = builder.stream( "another_topic_name",Consumed.with(Serdes.Integer(),eventsSpecificAvroSerde))
请注意,my.namespace.db.Key 的 Avro 架构是
{
"type": "record",
"name": "Key",
"namespace":"my.namespace.db",
"fields": [
{
"name": "id",
"type": "int"
}
]
}
显然 GlobalKTable 和 KStream 上的键是不同的对象,我不知道如何实现连接。我最初尝试过这个,但没有奏效。
events.join(tableData,
(key,val) -> {return my.namespace.db.Key.newBuilder().setId(key).build();})
/* To convert the Integer Key in KStream to the Avro Object Key
on GlobalKTable as to achieve the join */
(ev,tData) -> ... );
我得到的输出如下,我可以在我的一个连接主题上看到一个警告(这似乎很可疑),但没有其他任何连接实体的输出,就好像没有什么可消耗的。
INFO [Consumer clientId=kafka-streams-0401c29c-30a9-4969-93f9-5a83b3c834b4-StreamThread-1-consumer, groupId=kafka-streams] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336)
INFO stream-thread [kafka-streams-0401c29c-30a9-4969-93f9-5a83b3c834b4-StreamThread-1-consumer] Assigned tasks to clients as {0401c29c-30a9-4969-93f9-5a83b3c834b4=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:341)
WARN [Consumer clientId=kafka-streams-0401c29c-30a9-4969-93f9-5a83b3c834b4-StreamThread-1-consumer, groupId=kafka-streams] The following subscribed topics are not assigned to any members: [my-topic] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:241)
INFO [Consumer clientId=kafka-streams-0401c29c-30a9-4969-93f9-5a83b3c834b4-StreamThread-1-consumer, groupId=kafka-streams] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO [Consumer clientId=kafka-streams-0401c29c-30a9-4969-93f9-5a83b3c834b4-StreamThread-1-consumer, groupId=kafka-streams] Setting newly assigned partitions [mip-events-2-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:341)
INFO stream-thread [kafka-streams-0401c29c-30a9-4969-93f9-5a83b3c834b4-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread:346)
INFO KafkaAvroSerializerConfig values:
schema.registry.url = [http://kafka-schema-registry:8081]
auto.register.schemas = true
max.schemas.per.subject = 1000
(io.confluent.kafka.serializers.KafkaAvroSerializerConfig:175)
INFO KafkaAvroDeserializerConfig values:
schema.registry.url = [http://kafka-schema-registry:8081]
auto.register.schemas = true
max.schemas.per.subject = 1000
specific.avro.reader = true
(io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:175)
INFO KafkaAvroSerializerConfig values:
schema.registry.url = [http://kafka-schema-registry:8081]
auto.register.schemas = true
max.schemas.per.subject = 1000
(io.confluent.kafka.serializers.KafkaAvroSerializerConfig:175)
INFO KafkaAvroDeserializerConfig values:
schema.registry.url = [http://kafka-schema-registry:8081]
auto.register.schemas = true
max.schemas.per.subject = 1000
specific.avro.reader = true
(io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:175)
INFO stream-thread [kafka-streams-0401c29c-30a9-4969-93f9-5a83b3c834b4-StreamThread-1] partition assignment took 10 ms.
current active tasks: [0_0]
current standby tasks: []
previous active tasks: []
(org.apache.kafka.streams.processor.internals.StreamThread:351)
INFO stream-thread [kafka-streams-0401c29c-30a9-4969-93f9-5a83b3c834b4-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread:346)
INFO stream-client [kafka-streams-0401c29c-30a9-4969-93f9-5a83b3c834b4]State transition from REBALANCING to RUNNING (org.apache.kafka.streams.KafkaStreams:346)
我可以让这个加入在 Kafka Streams 上工作吗?请注意,如果我使用 KTable 读取主题并在 KStream 上使用 selectKey 来转换密钥,则此方法有效,但我想避免重新分区。
或者正确的方法是否应该以另一种方式从数据库中导入我的数据以避免创建 Avro 对象,以及如何使用 debezium 连接器和启用 AvroConverter 的 KafkaConnect 来实现?