我正在升级现有的 Spring Cloud Stream 应用程序以使用新的 Spring Cloud Function 生产者。生成的消息使用 Avro 格式。
这是我的整个设置:
spring:
cloud:
stream:
schema-registry-client:
endpoint: ${schema-registry.url:http://localhost:8081}
bindings:
info-out-0:
destination: info
producer:
useNativeEncoding: true
contentType: application/*+avro
kafka:
binder:
brokers: ${kafka.brokers:localhost}
transaction:
transaction-id-prefix: info-tx-
producer:
configuration:
retries: 2
acks: all
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
useNativeEncoding: true
function:
definition: info
kafka:
bootstrap-servers: ${kafka.brokers:localhost:9092}
@Configuration
class SchemaRegistryConfiguration {
@Bean
fun schemaRegistryClient(@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String): SchemaRegistryClient {
val client = ConfluentSchemaRegistryClient()
client.setEndpoint(endpoint)
return client
}
}
@Configuration
class KafkaProducerConfiguration {
@Bean
fun infoMonoProcessor(): MonoProcessor<Message<*>> {
return MonoProcessor.create<Message<*>>()
}
@Bean
fun info(): Supplier<Mono<Message<*>>> {
return Supplier { infoMonoProcessor() }
}
@Component
class InfoProducer(@Qualifier("infoMonoProcessor") private val infoProcessor: MonoProcessor<Message<*>>) {
@Transactional
fun send(info: Info): Mono<Unit> {
return Mono.fromCallable {
val infoReceived = InfoReceived(info)
val message = MessageBuilder.withPayload(infoReceived)
.setHeader(KafkaHeaders.TIMESTAMP, dateTime)
.build()
infoProcessor.onNext(message)
}
}.subscribeOn(Schedulers.elastic())
}
}
有一个 REST 端点接收一些信息并使用以下命令将其发送到输出主题InfoProducer
:
@RestController
@RequestMapping("/api/v1/info")
class InfoRestController(private val infoProducer: InfoProducer) {
@PostMapping
@ResponseStatus(CREATED)
fun registerInfo(@RequestBody info: Info): Mono<Unit> {
return infoProducer.send(info)
}
问题是我遇到了这个丑陋的异常:
java.lang.NullPointerException: null
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:601) ~[spring-cloud-function-context-3.0.0.RELEASE.jar:3.0.0.RELEASE]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputPublisherIfNecessary$4(BeanFactoryAwareFunctionRegistry.java:640) ~[spring-cloud-function-context-3.0.0.RELEASE.jar:3.0.0.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at xxx.InfoProducer$send$1.call(InfoProducer.kt:48) ~[classes/:na]
at xxx.InfoProducer$send$1.call(InfoProducer.kt:25) ~[classes/:na]
at reactor.core.publisher.MonoCallable.call(MonoCallable.java:91) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:225) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2019-12-11 16:41:43.985 DEBUG 22622 --- [ elastic-2] o.s.http.codec.json.Jackson2JsonEncoder : [23177c31] Encoding [kotlin.Unit]
InBeanFactoryAwareFunctionRegistry
acceptedOutputMimeTypes
是一个空数组。
这里有什么问题?