1

我正在升级现有的 Spring Cloud Stream 应用程序以使用新的 Spring Cloud Function 生产者。生成的消息使用 Avro 格式。

这是我的整个设置:

spring:
  cloud:
    stream:
      schema-registry-client:
        endpoint: ${schema-registry.url:http://localhost:8081}
      bindings:
        info-out-0:
          destination: info
          producer:
            useNativeEncoding: true
          contentType: application/*+avro
      kafka:
        binder:
          brokers: ${kafka.brokers:localhost}
          transaction:
            transaction-id-prefix: info-tx-
            producer:
              configuration:
                retries: 2
                acks: all
                key:
                  serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                  subject:
                    name:
                      strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
                value:
                  serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                  subject:
                    name:
                      strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
                schema:
                  registry:
                    url: ${spring.cloud.stream.schema-registry-client.endpoint}
                useNativeEncoding: true
    function:
      definition: info
  kafka:
    bootstrap-servers: ${kafka.brokers:localhost:9092}
@Configuration
class SchemaRegistryConfiguration {

    @Bean
    fun schemaRegistryClient(@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String): SchemaRegistryClient {
        val client = ConfluentSchemaRegistryClient()
        client.setEndpoint(endpoint)
        return client
    }
}
@Configuration
class KafkaProducerConfiguration {

    @Bean
    fun infoMonoProcessor(): MonoProcessor<Message<*>> {
        return MonoProcessor.create<Message<*>>()
    }

    @Bean
    fun info(): Supplier<Mono<Message<*>>> {
        return Supplier { infoMonoProcessor() }
    }
@Component
class InfoProducer(@Qualifier("infoMonoProcessor") private val infoProcessor: MonoProcessor<Message<*>>) {

    @Transactional
    fun send(info: Info): Mono<Unit> {
        return Mono.fromCallable {
                val infoReceived = InfoReceived(info)
                val message = MessageBuilder.withPayload(infoReceived)
                        .setHeader(KafkaHeaders.TIMESTAMP, dateTime)
                        .build()
                infoProcessor.onNext(message)
            }
        }.subscribeOn(Schedulers.elastic())
    }

}

有一个 REST 端点接收一些信息并使用以下命令将其发送到输出主题InfoProducer

@RestController
@RequestMapping("/api/v1/info")
class InfoRestController(private val infoProducer: InfoProducer) {

    @PostMapping
    @ResponseStatus(CREATED)
    fun registerInfo(@RequestBody info: Info): Mono<Unit> {
        return infoProducer.send(info)
    }

问题是我遇到了这个丑陋的异常:

java.lang.NullPointerException: null
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:601) ~[spring-cloud-function-context-3.0.0.RELEASE.jar:3.0.0.RELEASE]
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputPublisherIfNecessary$4(BeanFactoryAwareFunctionRegistry.java:640) ~[spring-cloud-function-context-3.0.0.RELEASE.jar:3.0.0.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at xxx.InfoProducer$send$1.call(InfoProducer.kt:48) ~[classes/:na]
    at xxx.InfoProducer$send$1.call(InfoProducer.kt:25) ~[classes/:na]
    at reactor.core.publisher.MonoCallable.call(MonoCallable.java:91) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:225) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2019-12-11 16:41:43.985 DEBUG 22622 --- [      elastic-2] o.s.http.codec.json.Jackson2JsonEncoder  : [23177c31] Encoding [kotlin.Unit]

InBeanFactoryAwareFunctionRegistry acceptedOutputMimeTypes是一个空数组。

这里有什么问题?

4

1 回答 1

0

由于您使用的是本机编码和 Confluent 提供的 avro 序列化程序,因此我认为这里不需要使用 Spring Cloud 模式注册表组件。您可以SchemaRegistryConfiguration从设置中删除。你也不需要这个:

spring:
  cloud:
    stream:
      schema-registry-client:
        endpoint: ${schema-registry.url:http://localhost:8081}

我建议也不要${spring.cloud.stream.schema-registry-client.endpoint}在这里使用,而是使用更明确的东西。

schema:
                  registry:
                    url: http://localhost:8081

请参阅此示例

如果您仍然遇到问题,您能否分享一个可重现的样本?

于 2019-12-11T20:37:37.723 回答