我正在使用 Spring-cloud-stream 版本 3.0.4
我正在编写一个 JSON 合并器,它侦听多个流,将 JSON 存储在状态存储中,然后将其合并以生成输出 JSON。由于我的服务只是一个 JSON 聚合器,我不想将 JSONS 转换为 Java 对象。所以我想做的是将上游服务的 JSON 发布为 JsonNode。进行合并并将其发布到下游主题上。
要将 JsonNode 添加为受信任的包,我已经在我的应用程序类中声明了一个 bean,如下所示。
@Bean
public KafkaHeaderMapper customKafkaHeaderMapper()
{
DefaultKafkaHeaderMapper kafkaHeaderMapper = new DefaultKafkaHeaderMapper();
kafkaHeaderMapper.addTrustedPackages("com.fasterxml.jackson.databind.node");
return kafkaHeaderMapper;
}
并在我的 application.yml 中添加了以下条目
spring.application.name: stream-aggregator
spring.cloud.stream.bindings.formDataIn:
destination: form-data
contentType: application/json
spring.cloud.stream.kafka.streams.binder:
configuration:
default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.key.default.type: com.company.datamapper.domain.EmpUUID
spring.json.value.default.type: com.fasterxml.jackson.databind.JsonNode
commit.interval.ms: 1000
spring.cloud.stream.kafka.binder.headerMapperBeanName:
customKafkaHeaderMapper
但是,此配置不起作用,我收到以下错误。
Caused by: java.lang.IllegalArgumentException: The class 'com.fasterxml.jackson.databind.node.ObjectNode' is not in the trusted packages: [java.util, java.lang, com.fasterxml.jackson.databind]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:125) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:99) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:425) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) ~[kafka-streams-2.3.1.jar:?]
我有 2 个查询
- 如何解决上述错误。
- 我的方法是最优的还是我应该以不同的方式进行聚合。