0

我有一个关于使用 Kafka 和不同名称的主题(Kafka 代理)和主题(Schema Registry)设置流处理器的问题。

spring:
  cloud:
    schema-registry-client:
      endpoint: http://localhost:8081
      cached: true
    stream:
      function:
        definition: process
      default:
        consumer:
          use-native-decoding: true
        producer:
          use-native-encoding: true
          header-mode: none
      bindings:
        process-in-0:
          group: spring-boot-kafka
          destination: abc.bla
          consumer:
            max-attempts: 3
        process-out-0:
          destination: def.bla
      kafka:
        binder:
          auto-add-partitions: false
          auto-create-topics: false
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
            schema.registry.url: http://localhost:8081
           allow.auto.create.topics: false
           auto.register.schemas: false
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://localhost:8081
            auto.register.schemas: false
          brokers:
            - localhost:9092
          configuration:
            allow.auto.create.topics: false
            auto.register.schemas: false
            application.id: "${spring.application.name}"

首先,Kafka 代理和模式注册表似乎一切正常,但如果处理器接收到事件,模式注册表魔法就会启动。

而不是将 abc 作为主题发送到模式注册表 abc.bla 将被发送。架构注册表回答未找到。

预期:localhost:8081/subjects/abc/versions 意外和错误:localhost:8081/subjects/abc.bla/versions

error_code  40401
message "Subject not found."

我想知道出了什么问题,因为单个生产者或消费者客户端似乎能够在没有显式配置的情况下从主题中识别出正确的主题名称。

这里是处理器代码:

@SpringBootApplication
@EnableSchemaRegistryClient
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public Function<ABC, DEF> process() {
        return Transformer::transform;
    }
}

这是我认为可能是问题的堆栈跟踪:

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: { a long schema }
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:298) ~[kafka-schema-registry-client-7.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:368) ~[kafka-schema-registry-client-7.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:453) ~[kafka-schema-registry-client-7.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:440) ~[kafka-schema-registry-client-7.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:254) ~[kafka-schema-registry-client-7.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:444) ~[kafka-schema-registry-client-7.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getId(SchemaRegistryClient.java:192) ~[kafka-schema-registry-client-7.0.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:73) ~[kafka-avro-serializer-5.3.0.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.3.0.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.7.1.jar:na]

有人知道我如何配置io.confluent.kafka.serializers.KafkaAvroDeserializerio.confluent.kafka.serializers.KafkaAvroSerializer更正吗?

非常感谢,马库斯

4

1 回答 1

0

对不起,现在我可以回答我自己的问题了。

原因是公司内部的 TopicNameStrategy。

subject:
  name:
    strategy: CompanyInternalStrategy

在此之后,主题/主题问题得到修复。我在这里找到的提示

于 2021-11-16T11:34:25.607 回答