我正在使用带有 MockSchemaRegistryClient的Spring Embedded Kafka Broker为我们的 kafka 系统构建集成测试。我正在为使用 Streams API (KStreamBuilder) 构建的一种 Stream 拓扑构建测试。这个特定的拓扑有一个 KStream (stream1) 馈送到 KTable (table1)。
当我将来自 table1 的 KTableProcessor 的输入输入到 stream1 时,我遇到了一个错误:
Exception in thread "mortgage-kafka-consumers-it-c1dd9185-ce16-415c-ad82-293c1281c897-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=streaming.mortgage.application_party, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202)
at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 6
**Caused by: java.io.IOException: Cannot get schema from schema registry!**
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
at com.sofi.kafka.serialization.AvroDeserializer.deserialize(AvroDeserializer.java:35)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:151)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:135)
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:62)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:45)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:131)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:188)
at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
The KTableProcessor is attempting to deserialize an entry from the RocksDB state store, however the schema does not exist in the mock schema registry. The topic whose schema is being requested is: **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog**
As the exception states, the schema has not been registered. However, the topic **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog-key** does have a registered schema (registered when the entry's key is serialized for the query).
Since this is an internal topic, I don't expect to have to register this schema myself, however I'm failing because of the schema's absence in the registry. Is there a way to have changelog schemas registered prior to data ingestion? Is there a way to disable state store changelogging with the KStreamBuilder?
提前致谢!