我有一个关于使用 Kafka 和不同名称的主题(Kafka 代理)和主题(Schema Registry)设置流处理器的问题。
spring:
cloud:
schema-registry-client:
endpoint: http://localhost:8081
cached: true
stream:
function:
definition: process
default:
consumer:
use-native-decoding: true
producer:
use-native-encoding: true
header-mode: none
bindings:
process-in-0:
group: spring-boot-kafka
destination: abc.bla
consumer:
max-attempts: 3
process-out-0:
destination: def.bla
kafka:
binder:
auto-add-partitions: false
auto-create-topics: false
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
schema.registry.url: http://localhost:8081
allow.auto.create.topics: false
auto.register.schemas: false
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
auto.register.schemas: false
brokers:
- localhost:9092
configuration:
allow.auto.create.topics: false
auto.register.schemas: false
application.id: "${spring.application.name}"
首先,Kafka 代理和模式注册表似乎一切正常,但如果处理器接收到事件,模式注册表魔法就会启动。
而不是将 abc 作为主题发送到模式注册表 abc.bla 将被发送。架构注册表回答未找到。
预期:localhost:8081/subjects/abc/versions 意外和错误:localhost:8081/subjects/abc.bla/versions
error_code 40401
message "Subject not found."
我想知道出了什么问题,因为单个生产者或消费者客户端似乎能够在没有显式配置的情况下从主题中识别出正确的主题名称。
这里是处理器代码:
@SpringBootApplication
@EnableSchemaRegistryClient
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Function<ABC, DEF> process() {
return Transformer::transform;
}
}
这是我认为可能是问题的堆栈跟踪:
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: { a long schema }
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:298) ~[kafka-schema-registry-client-7.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:368) ~[kafka-schema-registry-client-7.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:453) ~[kafka-schema-registry-client-7.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:440) ~[kafka-schema-registry-client-7.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:254) ~[kafka-schema-registry-client-7.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:444) ~[kafka-schema-registry-client-7.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getId(SchemaRegistryClient.java:192) ~[kafka-schema-registry-client-7.0.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:73) ~[kafka-avro-serializer-5.3.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.3.0.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.7.1.jar:na]
有人知道我如何配置io.confluent.kafka.serializers.KafkaAvroDeserializer或io.confluent.kafka.serializers.KafkaAvroSerializer更正吗?
非常感谢,马库斯