0

我试图弄清楚如何测试我的 Spring Cloud Streams Kafka-Streams 应用程序。

该应用程序看起来像这样:

流 1:主题 1 > 主题
2 流 2:主题 2 + 主题 3 加入 > 主题 4
流 3:主题 4 >主题 5

我尝试了不同的方法,例如 TestChannelBinder,但这种方法仅适用于 Simple 函数,而不适用于 Streams 和 Avro。

我决定将 EmbeddedKafka 与 MockSchemaRegistryClient 一起使用。我可以生产一个主题,也可以再次从同一个主题(topic1)消费,但我无法从(topic2)消费。

在我的测试 application.yaml 中,我放置了以下配置(我现在只测试第一个流,一旦它工作我想扩展它):

spring.application.name: processingapp
spring.cloud:
  function.definition: stream1 # not now ;stream2;stream3
    stream:
      bindings:
        stream1-in-0:
          destination: topic1
        stream1-out-0:
          destination: topic2
      kafka:
        binder:
          min-partition-count: 1
          replication-factor: 1
          auto-create-topics: true
          auto-add-partitions: true
        bindings:
          default:
            consumer:
              autoRebalanceEnabled: true
              resetOffsets: true
              startOffset: earliest
        stream1-in-0:
          consumer:
            keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
            valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream1-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        streams:
          binder:
            configuration:
              schema.registry.url: mock://localtest
              specivic.avro.reader: true

我的测试如下所示:

@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {

    private static final String INPUT_TOPIC = "topic1";

    private static final String OUTPUT_TOPIC = "topic2";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 1, INPUT_TOPIC, OUTPUT_TOPIC);

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
    }

    @org.junit.Test
    public void testSendReceive() throws IOException {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        senderProps.put("key.serializer", LongSerializer.class);
        senderProps.put("value.serializer", SpecificAvroSerializer.class);
        senderProps.put("schema.registry.url", "mock://localtest");
        AvroFileParser fileParser = new AvroFileParser();
        DefaultKafkaProducerFactory<Long, Test1> pf = new DefaultKafkaProducerFactory<>(senderProps);
        KafkaTemplate<Long, Test1> template = new KafkaTemplate<>(pf, true);
        Test1 test1 = fileParser.parseTest1("src/test/resources/mocks/test1.json");

        template.send(INPUT_TOPIC, 123456L, test1);
        System.out.println("produced");
        
        Map<String, Object> consumer1Props = KafkaTestUtils.consumerProps("testConsumer1", "false", embeddedKafka.getEmbeddedKafka());
        consumer1Props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumer1Props.put("key.deserializer", LongDeserializer.class);
        consumer1Props.put("value.deserializer", SpecificAvroDeserializer.class);
        consumer1Props.put("schema.registry.url", "mock://localtest");
        DefaultKafkaConsumerFactory<Long, Test1> cf = new DefaultKafkaConsumerFactory<>(consumer1Props);

        Consumer<Long, Test1> consumer1 = cf.createConsumer();
        consumer1.subscribe(Collections.singleton(INPUT_TOPIC));
        ConsumerRecords<Long, Test1> records = consumer1.poll(Duration.ofSeconds(10));
        consumer1.commitSync();

        System.out.println("records count?");
        System.out.println("" + records.count());

        Test1 fetchedTest1;
        fetchedTest1 = records.iterator().next().value();
        assertThat(records.count()).isEqualTo(1);
        System.out.println("found record");
        System.out.println(fetchedTest1.toString());

        Map<String, Object> consumer2Props = KafkaTestUtils.consumerProps("testConsumer2", "false", embeddedKafka.getEmbeddedKafka());
        consumer2Props.put("key.deserializer", StringDeserializer.class);
        consumer2Props.put("value.deserializer", TestAvroDeserializer.class);
        consumer2Props.put("schema.registry.url", "mock://localtest");

        DefaultKafkaConsumerFactory<String, Test2> consumer2Factory = new DefaultKafkaConsumerFactory<>(consumer2Props);
        Consumer<String, Test2> consumer2 = consumer2Factory.createConsumer();
        consumer2.subscribe(Collections.singleton(OUTPUT_TOPIC));
        ConsumerRecords<String, Test2> records2 = consumer2.poll(Duration.ofSeconds(30));
        consumer2.commitSync();
        

        if (records2.iterator().hasNext()) {
            System.out.println("has next");
        } else {
            System.out.println("has no next");
        }
    }
}

尝试从 topic2 消费和反序列化时收到以下异常:

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 0
Caused by: java.io.IOException: Cannot get schema from schema registry!
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:193) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndId(MockSchemaRegistryClient.java:249) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaById(MockSchemaRegistryClient.java:232) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:307) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:86) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-6.2.0.jar:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:895) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:812) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.1.jar:na]

不会有消息被消费。

所以我试图覆盖 SpecificAvroSerde 并直接注册模式并使用这个反序列化器。

public class TestAvroDeserializer<T extends org.apache.avro.specific.SpecificRecord>
        extends SpecificAvroDeserializer<T> implements Deserializer<T> {

    private final KafkaAvroDeserializer inner;

    public TestAvroDeserializer() throws IOException, RestClientException {
        MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();

        Schema.Parser parser = new Schema.Parser();
        Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
        mockedClient.register("test2-value", test2Schema , 1, 0);
        inner = new KafkaAvroDeserializer(mockedClient);
    }

    /**
     * For testing purposes only.
     */
    TestAvroDeserializer(final SchemaRegistryClient client) throws IOException, RestClientException {
        MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();

        Schema.Parser parser = new Schema.Parser();
        Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
        mockedClient.register("test2-value", test2Schema , 1, 0);

        inner = new KafkaAvroDeserializer(mockedClient);
    }
}

使用这个解串器它也不会工作。有没有人有关于如何使用 EmbeddedKafka 和 MockSchemaRegistry 进行此测试的经验?还是我应该使用另一种方法?

如果有人可以提供帮助,我很高兴。先感谢您。

4

0 回答 0