0

我正在尝试根据此处的示例构建一个自定义 http 转换器。而kafka binder源代码来自这里

我在 SCDF 中使用 kafka 作为活页夹。场景是当调用 http 时,我在函数中解析消息并将 AVRO 消息放入 kafka。

当定义的函数如下所示时,示例应用程序可以正常工作。该应用程序运行良好,可以将 avro 消息转换并放入 kafka 主题,并且可以正常运行。

@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class DemoSourceTransformer {

    @Bean
    public Function<String, Event> transform() {
        return value -> {
            //transform message here
            return event;
        };
    }
}

要求是有一个附加的标头,用于基于标头的附加下游处理。我无法访问我拥有的引导版本中的标题。2.1.12。

我基于云流文档中的某些其他示例所做的更改是以这种方式定义功能

  @Bean
    public Function<Message<String>, Message<Event>> transform() {
        return value -> {
            //transform message here
            return MessageBuilder.withPayload(event)
                    .setHeader("custom_key", "custom_value")
                    .build();
        };
    }

但是这次消息没有转移到kafka主题,而是看到以下错误。

java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
    at io.confluent.kafka.serializers.AvroSchemaUtils.getSchema(AvroSchemaUtils.java:76) ~[kafka-avro-serializer-5.3.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54) ~[kafka-avro-serializer-5.3.1.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.1.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:489) ~[spring-kafka-2.2.12.RELEASE.jar:2.2.12.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407) ~[spring-kafka-2.2.12.RELEASE.jar:2.2.12.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:242) ~[spring-kafka-2.2.12.RELEASE.jar:2.2.12.RELEASE]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:461) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:177) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:89) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:146) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:99) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1925) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1799) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:172) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:70) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable.subscribe(FluxPeekFuseable.java:86) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:7957) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:86) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:92) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume.subscribe(FluxOnErrorResume.java:47) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:7957) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:442) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at org.springframework.integration.channel.MessageChannelReactiveUtils.lambda$null$0(MessageChannelReactiveUtils.java:67) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:413) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.http.inbound.HttpRequestHandlingEndpointSupport.actualDoHandleRequest(HttpRequestHandlingEndpointSupport.java:374) ~[spring-integration-http-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.http.inbound.HttpRequestHandlingEndpointSupport.doHandleRequest(HttpRequestHandlingEndpointSupport.java:253) ~[spring-integration-http-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.http.inbound.HttpRequestHandlingMessagingGateway.handleRequest(HttpRequestHandlingMessagingGateway.java:112) ~[spring-integration-http-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.web.servlet.mvc.HttpRequestHandlerAdapter.handle(HttpRequestHandlerAdapter.java:53) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.boot.actuate.web.trace.servlet.HttpTraceFilter.doFilterInternal(HttpTraceFilter.java:88) ~[spring-boot-actuator-2.1.12.RELEASE.jar:2.1.12.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:209) ~[spring-security-web-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178) ~[spring-security-web-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:358) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:271) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:94) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:114) ~[spring-boot-actuator-2.1.12.RELEASE.jar:2.1.12.RELEASE]
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:104) ~[spring-boot-actuator-2.1.12.RELEASE.jar:2.1.12.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:367) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:860) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1598) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_221]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_221]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]

我目前正在使用 kafka avro 序列化器版本 5.3.1。除了 avro 有效负载之外,我是否需要使用任何其他版本或其他方式来确保我可以将标题添加到主题的消息中。

4

1 回答 1

0

不清楚 a MessageorEvent是什么,但错误表明它显然不是 Avro 类型。

Avro 需要一个模式。你的架构在哪里?

例如,您必须使用GenericRecord或创建使用Avro Maven 插件发送的值

如何访问 Kafka 消息头

这取决于 Spring 是否公开该信息。在基本的 Kafka Java 客户端中,这是ConsumerRecord

于 2020-04-30T06:12:35.513 回答