0

我正在使用以下应用程序使用带有 kafka-binding 的 spring-cloud-stream 进行练习:

spring:
  cloud:
    function:
      definition: single;react
    stream:
      bindings:
        single-in-0:
          destination: single-in
          group: single-in
        single-out-0:
          destination: single-out
        react-in-0:
          destination: react-in
          group: react-in
        react-out-0:
          destination: react-out
      kafka:
        default:
          consumer:
            enableDlq: true
            dlqName: dlq
        binder:
          brokers: 192.168.153.133:9092
          autoAddPartitions: true
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    
    @Bean
    public Function<String,String> single() {
        return msg -> {
            if ("EXCP".equalsIgnoreCase(msg)) {
                throw new RuntimeException("Received bad massage");
            }
            return "OK: " + msg;
        };
    }
    
    @Bean
    public Function<Flux<String>,Flux<String>> react() {
        return fluxMsg -> {
            return fluxMsg.map(msg -> {
                if ("EXCP".equalsIgnoreCase(msg)) {
                    throw new RuntimeException("Received bad massage");
                }
                return "OK: " + msg;
            });
        };
    }
}

如您所见,应用程序非常简单:如果收到的消息是“EXCP”,则抛出异常,否则发布“OK”消息。

我不清楚的是,为什么如果从“react-in”读取错误消息,那么从该主题读取的每条消息都进入 DLQ 主题。例如:

  1. 将“test-1”写入“react-in” -> 在“react-out”中得到“OK:test 1”
  2. 将 "test-2" 写入 "react-in" -> 在 "react-out" 中得到 "OK: test 2"
  3. 将“EXCP”写入“react-in” -> 在“dlq”中获得“ECP”
  4. 将“test-3”写入“react-in” -> 在“dlq”中得到“test 3”

我本来希望最后一条消息发布在“react-out”主题而不是“dlq”主题中。这里的日志:

...
ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 100
    auto.offset.reset = earliest
    bootstrap.servers = [192.168.153.133:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = react-in
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2020-08-13 09:59:10.622  INFO 17688 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-13 09:59:10.622  INFO 17688 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:10.622  INFO 17688 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597305550622
2020-08-13 09:59:10.623  INFO 17688 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-react-in-4, groupId=react-in] Subscribed to topic(s): react-in
2020-08-13 09:59:10.624  INFO 17688 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-08-13 09:59:10.631  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-single-in-2, groupId=single-in] Successfully joined group with generation 8
2020-08-13 09:59:10.637  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-single-in-2, groupId=single-in] Adding newly assigned partitions: single-in-0
2020-08-13 09:59:10.648  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-single-in-2, groupId=single-in] Setting offset for partition single-in-0 to the committed offset FetchPosition{offset=18, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.153.133:9092 (id: 0 rack: null)], epoch=0}}
2020-08-13 09:59:10.658  INFO 17688 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@de8039f
2020-08-13 09:59:10.668  INFO 17688 --- [container-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-react-in-4, groupId=react-in] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:10.676  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Discovered group coordinator 192.168.153.133:9092 (id: 2147483647 rack: null)
2020-08-13 09:59:10.677  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] (Re-)joining group
2020-08-13 09:59:10.689  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-08-13 09:59:10.689  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] (Re-)joining group
2020-08-13 09:59:10.699  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Finished assignment for group at generation 4: {consumer-react-in-4-7ca5b03b-bc58-4af0-b4e5-c0666fc2f05a=Assignment(partitions=[react-in-0])}
2020-08-13 09:59:10.723  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Successfully joined group with generation 4
2020-08-13 09:59:10.723  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Adding newly assigned partitions: react-in-0
2020-08-13 09:59:10.726  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Setting offset for partition react-in-0 to the committed offset FetchPosition{offset=51, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.153.133:9092 (id: 0 rack: null)], epoch=0}}
2020-08-13 09:59:10.726  INFO 17688 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : react-in: partitions assigned: [react-in-0]
2020-08-13 09:59:10.728  INFO 17688 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 4.473 seconds (JVM running for 5.039)
2020-08-13 09:59:10.753  INFO 17688 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : single-in: partitions assigned: [single-in-0]
2020-08-13 09:59:23.925  INFO 17688 --- [container-0-C-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [192.168.153.133:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-3
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-08-13 09:59:23.932  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-13 09:59:23.932  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:23.932  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597305563932
2020-08-13 09:59:23.944  INFO 17688 --- [ad | producer-3] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-3] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:31.919  INFO 17688 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.react-in-0' has 0 subscriber(s).
2020-08-13 09:59:34.922 ERROR 17688 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.react-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[4], headers={kafka_offset=54, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305564033, contentType=application/json, kafka_groupId=react-in}], failedMessage=GenericMessage [payload=byte[4], headers={kafka_offset=54, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305564033, contentType=application/json, kafka_groupId=react-in}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1878)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1860)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1797)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1737)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1634)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1364)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1080)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[4], headers={kafka_offset=54, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305564033, contentType=application/json, kafka_groupId=react-in}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 27 more

2020-08-13 09:59:34.925  INFO 17688 --- [container-0-C-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [192.168.153.133:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-4
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-08-13 09:59:34.929  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-13 09:59:34.930  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:34.930  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597305574929
2020-08-13 09:59:34.943  INFO 17688 --- [ad | producer-4] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-4] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:39.770 ERROR 17688 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.react-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[6], headers={kafka_offset=55, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305568884, contentType=application/json, kafka_groupId=react-in}], failedMessage=GenericMessage [payload=byte[6], headers={kafka_offset=55, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305568884, contentType=application/json, kafka_groupId=react-in}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1878)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1860)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1797)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1737)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1634)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1364)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1080)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[6], headers={kafka_offset=55, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305568884, contentType=application/json, kafka_groupId=react-in}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 27 more

有趣的是,“单一”函数的行为符合我的预期:

  1. 将“test-1”写入“single-in” -> 在“single-out”中得到“OK:test 1”
  2. 将 "test-2" 写入 "single-in" -> 在 "single-out" 中得到 "OK: test 2"
  3. 将“EXCP”写入“single-in” -> 在“dlq”中获得“ECP”
  4. 将“test-3”写入“single-in” -> 在“single-out”中得到“OK:test 3”

有人可以解释一下为什么使用反应器实现所有消息都在 dql 中发布,以及“调度程序没有订阅者”错误是什么意思?

谢谢

4

1 回答 1

0

它将失败,因为反应流已针对所有其他事件消耗关闭,因为它在引发异常的第一个事件的过程中失败,因此消耗的其他事件无法处理,因为您正在接收错误“调度程序没有订阅者”

  1. 使用 onErrorContinue

  @Bean
  public Function<Flux<String>,Flux<String>> react() {
    return fluxMsg -> {
      return fluxMsg.map(msg -> {
        if ("EXCP".equalsIgnoreCase(msg)) {
          throw new RuntimeException("Received bad massage");
        }
        return "OK: " + msg;
      }).onErrorContinue((throwable, o) -> logger.error(throwable.getMessage(), throwable));
    };
  }

  1. 删除所有消费者组和主题并重新启动应用程序
于 2021-10-01T10:06:13.730 回答