5

我正在使用带有 MockSchemaRegistryClient的Spring Embedded Kafka Broker为我们的 kafka 系统构建集成测试。我正在为使用 Streams API (KStreamBuilder) 构建的一种 Stream 拓扑构建测试。这个特定的拓扑有一个 KStream (stream1) 馈送到 KTable (table1)。

当我将来自 table1 的 KTableProcessor 的输入输入到 stream1 时,我遇到了一个错误:

Exception in thread "mortgage-kafka-consumers-it-c1dd9185-ce16-415c-ad82-293c1281c897-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=streaming.mortgage.application_party, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 6
**Caused by: java.io.IOException: Cannot get schema from schema registry!**
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
    at com.sofi.kafka.serialization.AvroDeserializer.deserialize(AvroDeserializer.java:35)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:151)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:135)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:62)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:45)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:131)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:188)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)

The KTableProcessor is attempting to deserialize an entry from the RocksDB state store, however the schema does not exist in the mock schema registry. The topic whose schema is being requested is: **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog**

As the exception states, the schema has not been registered. However, the topic **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog-key** does have a registered schema (registered when the entry's key is serialized for the query). 

Since this is an internal topic, I don't expect to have to register this schema myself, however I'm failing because of the schema's absence in the registry. Is there a way to have changelog schemas registered prior to data ingestion? Is there a way to disable state store changelogging with the KStreamBuilder?

提前致谢!

4

1 回答 1

3

解决了这个问题,我现在将不好意思地重新叙述:当使用带有嵌入式 kafka 代理的 KTable(通过 Streams API)时,您需要使用嵌入式 kafka 代理的每次运行唯一的 State Store 目录配置 KafkaStreams 对象(就我而言,每次运行测试)。

StreamsConfig.STATE_DIR_CONFIG您可以通过配置控制 State Store 目录。我通过将时间戳附加到默认状态存储目录使其独一无二

properties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kraken-streams/" + LocalDateTime.now().toString());

问题是每次初始化嵌入式 kafka 代理时,旧的状态存储都存在于同一位置。当第一条记录输入到 KTable 的主题中时,状态存储能够返回以前的值。这导致尝试反序列化尚未(就模式注册表实例而言)被序列化的状态存储记录。模式仅在序列化时注册,因此由于缺少注册模式,反序列化尝试失败。

于 2018-06-19T16:57:06.717 回答