1

我有一个接受@Payload 字符串的@StreamListener。为了测试这个 Listener 类,我使用嵌入式 Kafka 编写了一个 Junit 类。运行测试类时出现错误

错误

错误 osihandler.LoggingHandler - org.springframework.messaging.converter.MessageConversionException:GenericMessage 无法从 [[B] 转换为 [java.lang.String]


如果我将 @Payload 的 dataType 从 String 更改为 byte[] 消息将被我的侦听器类选中。

有人可以帮我知道这里的问题吗?我想这与云流配置有关。


@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest(classes = IntegrationTestConsumer.class)
@EmbeddedKafka(partitions = 1, controlledShutdown = true,
        topics = {
                "input",
                "output"})
public class TestUtils {

    public static final String KEY_SERIALIZER = "key.serializer";
    public static final String VALUE_SERIALIZER = "value.serializer";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @BeforeEach
    public void setup() {
        System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void someTest() throws Exception {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        senderProps.put(KEY_SERIALIZER, StringSerializer.class);
        senderProps.put(VALUE_SERIALIZER, StringSerializer.class);
        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
        KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory, true);
        template.setDefaultTopic("input");
        template.sendDefault("foo");

        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
                "input_group",
                "false",
                this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put("key.deserializer", StringDeserializer.class);
        consumerProps.put("value.deserializer", StringDeserializer.class);
        DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);

        Consumer<String, String> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton("output"));
        ConsumerRecords<String, String> records = consumer.poll(10_000);
        consumer.commitSync();
        Assertions.assertThat(records.count()).isGreaterThanOrEqualTo(1);

这是我的 application.yaml 的样子。
spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        bindings:
          input:
            consumer:
              enable-dlq: true
              dlq-name: output
              dlq-producer-properties:
                retries: 1
        binder:
          brokers: ${spring.embedded.kafka.brokers}
          replicationFactor: ${replication_factor:1}
          autoCreateTopics: true
          autoAddPartitions: true
          configuration:
            retries: 1
            batch.size: 16384
            linger.ms: 1
            enable.idempotence: true
            buffer.memory: 33554432
            request.timeout.ms: 3000
            transaction.timeout.ms: 3000
            max.block.ms: ${kafka_max_block_time:5000}
            max.poll.records: 80
            poll.timeout: 10000
            commit.retries: 1
            commit.retry.interval: 1000
            session.timeout.ms.config: 50000
            shutdown.signal: INT,TERM
            acks: "all"
      bindings:
        output:
          destination: output
          contentType: application/json
          producer:
            partitionCount: ${partition_count:1}
        input:
          destination: input
          contentType: application/json
          partitioned: true
          group: input_group

4

2 回答 2

1

我的项目中也发生了类似的错误,开发人员正在模拟 objectMapper,如果存在任何此类代码,请检查并删除模拟。

于 2019-10-01T09:52:59.040 回答
1

请检查您是否模拟了 ObjectMapper,因为 ObjectMapper 无法将 byte[] 转换为 String。

于 2019-10-01T09:51:01.477 回答