3

我正在使用 Spring Cloud Stream Kafka binder 来使用来自 Kafka 的消息。我可以使我的示例与单个 Kafka Binder 一起工作,如下所示

spring:
  cloud:
    stream:
      kafka:
        binder:
          consumer-properties: {enable.auto.commit: true}
          auto-create-topics: false
          brokers: <broker url>
      bindings:
        consumer:
          destination: some-topic
          group: testconsumergroup
          consumer:
            concurrency: 1
            valueSerde: JsonSerde
        producer:
          destination: some-other-topic
          producer:
            valueSerde: JsonSerde

请注意,这两个绑定都指向同一个 Kafka Broker。但是,我有一种情况,我需要发布到某个 Kafka 集群中的一个主题,并且还需要从另一个 Kafka 集群中的另一个主题消费。我应该如何更改我的配置才能绑定到不同的 Kafka 集群?

我尝试过这样的事情

spring:
  cloud:
    stream:
      binders:
        defaultbinder:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
      bindings:
        consumer:
          binder: kafka1
          destination: some-topic
          group: testconsumergroup
          consumer:
            concurrency: 1
            valueSerde: JsonSerde
        producer:
          binder: defaultbinder
          destination: some-topic
          producer:
            valueSerde: JsonSerde      
      kafka:
        binder:
          consumer-properties: {enable.auto.commit: true}
          auto-create-topics: false
          brokers: <cluster1-brokers>

spring:
  cloud:
    stream:
      binders:
        defaultbinder:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
      kafka:
        bindings:
          consumer:
            binder: kafka1
            destination: some-topic
            group: testconsumergroup
            consumer:
              concurrency: 1
              valueSerde: JsonSerde
          producer:
            binder: defaultbinder
            destination: some-topic
            producer:
              valueSerde: JsonSerde      
      kafka:
        binder:
          consumer-properties: {enable.auto.commit: true}
          auto-create-topics: false
          brokers: <cluster1-brokers>

但是他们两个似乎都不起作用。第一个配置似乎无效。对于第二个配置,我收到以下错误

Caused by: java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : kafka1,defaultbinder, and no default binder has been set.

我正在使用依赖项 'org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.1.RELEASE' 和 Spring Boot 2.2.6

请让我知道如何使用 Spring Cloud Stream 为 Kafka 配置多个绑定

更新

在下面尝试了此配置

spring:
  cloud:
    stream:
      binders:
        kafka2:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder.brokers: <cluster2-brokers>
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder.brokers: <cluster1-brokers>
      bindings:
        consumer:
          destination: <some-topic>
          binder: kafka1
          group: testconsumergroup
          content-type: application/json
          nativeEncoding: true
          consumer:
            concurrency: 1
            valueSerde: JsonSerde
        producer:
          destination: some-topic
          binder: kafka2
          contentType: application/json
          nativeEncoding: true
          producer:
            valueSerde: JsonSerde

消息流和 EventHubBinding 如下

public interface MessageStreams {
  String PRODUCER = "producer";
  String CONSUMER = "consumer;

  @Output(PRODUCER)
  MessageChannel producerChannel();

  @Input(CONSUMER)
  SubscribableChannel consumerChannel()
}

@EnableBinding(MessageStreams.class)
public class EventHubStreamsConfiguration {
}

我的 Producer 类如下所示

@Component
@Slf4j
public class EventPublisher {
  private final MessageStreams messageStreams;

  public EventPublisher(MessageStreams messageStreams) {
    this.messageStreams = messageStreams;
  }

  public boolean publish(CustomMessage event) {
    MessageChannel messageChannel = getChannel();
    MessageBuilder messageBuilder = MessageBuilder.withPayload(event);
    boolean messageSent = messageChannel.send(messageBuilder.build());
    return messageSent;
  }

  protected MessageChannel getChannel() {
    return messageStreams.producerChannel();
  }
}

消费者类如下所示

@Component
@Slf4j
public class EventHandler {
  private final MessageStreams messageStreams;

  public EventHandler(MessageStreams messageStreams) {
    this.messageStreams = messageStreams;
  }

  @StreamListener(MessageStreams.CONSUMER)
  public void handleEvent(Message<CustomMessage> message) throws Exception 
  {
    // process the event
  }

  @Override
  @ServiceActivator(inputChannel = "some-topic.testconsumergroup.errors")
  protected void handleError(ErrorMessage errorMessage) throws Exception {
    // handle error;
  }
}

尝试发布和使用测试中的消息时出现以下错误。

Dispatcher has no subscribers for channel 'application.producer'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[104], headers={contentType=application/json, timestamp=1593517340422}]

我错过了什么吗?对于单个集群,我能够发布和使用消息。该问题仅发生在多个集群绑定中

4

0 回答 0