我有一个接受@Payload 字符串的@StreamListener。为了测试这个 Listener 类,我使用嵌入式 Kafka 编写了一个 Junit 类。运行测试类时出现错误
错误
错误 osihandler.LoggingHandler - org.springframework.messaging.converter.MessageConversionException:GenericMessage 无法从 [[B] 转换为 [java.lang.String]
如果我将 @Payload 的 dataType 从 String 更改为 byte[] 消息将被我的侦听器类选中。
有人可以帮我知道这里的问题吗?我想这与云流配置有关。
@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest(classes = IntegrationTestConsumer.class)
@EmbeddedKafka(partitions = 1, controlledShutdown = true,
topics = {
"input",
"output"})
public class TestUtils {
public static final String KEY_SERIALIZER = "key.serializer";
public static final String VALUE_SERIALIZER = "value.serializer";
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@BeforeEach
public void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getBrokersAsString());
}
@Test
public void someTest() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(KEY_SERIALIZER, StringSerializer.class);
senderProps.put(VALUE_SERIALIZER, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory, true);
template.setDefaultTopic("input");
template.sendDefault("foo");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"input_group",
"false",
this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", StringDeserializer.class);
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("output"));
ConsumerRecords<String, String> records = consumer.poll(10_000);
consumer.commitSync();
Assertions.assertThat(records.count()).isGreaterThanOrEqualTo(1);
这是我的 application.yaml 的样子。
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
consumer:
enable-dlq: true
dlq-name: output
dlq-producer-properties:
retries: 1
binder:
brokers: ${spring.embedded.kafka.brokers}
replicationFactor: ${replication_factor:1}
autoCreateTopics: true
autoAddPartitions: true
configuration:
retries: 1
batch.size: 16384
linger.ms: 1
enable.idempotence: true
buffer.memory: 33554432
request.timeout.ms: 3000
transaction.timeout.ms: 3000
max.block.ms: ${kafka_max_block_time:5000}
max.poll.records: 80
poll.timeout: 10000
commit.retries: 1
commit.retry.interval: 1000
session.timeout.ms.config: 50000
shutdown.signal: INT,TERM
acks: "all"
bindings:
output:
destination: output
contentType: application/json
producer:
partitionCount: ${partition_count:1}
input:
destination: input
contentType: application/json
partitioned: true
group: input_group