我正在使用 Spring Cloud Stream Kafka binder 来使用来自 Kafka 的消息。我可以使我的示例与单个 Kafka Binder 一起工作,如下所示
spring:
cloud:
stream:
kafka:
binder:
consumer-properties: {enable.auto.commit: true}
auto-create-topics: false
brokers: <broker url>
bindings:
consumer:
destination: some-topic
group: testconsumergroup
consumer:
concurrency: 1
valueSerde: JsonSerde
producer:
destination: some-other-topic
producer:
valueSerde: JsonSerde
请注意,这两个绑定都指向同一个 Kafka Broker。但是,我有一种情况,我需要发布到某个 Kafka 集群中的一个主题,并且还需要从另一个 Kafka 集群中的另一个主题消费。我应该如何更改我的配置才能绑定到不同的 Kafka 集群?
我尝试过这样的事情
spring:
cloud:
stream:
binders:
defaultbinder:
type: kafka
environment:
spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
kafka1:
type: kafka
environment:
spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
bindings:
consumer:
binder: kafka1
destination: some-topic
group: testconsumergroup
consumer:
concurrency: 1
valueSerde: JsonSerde
producer:
binder: defaultbinder
destination: some-topic
producer:
valueSerde: JsonSerde
kafka:
binder:
consumer-properties: {enable.auto.commit: true}
auto-create-topics: false
brokers: <cluster1-brokers>
和
spring:
cloud:
stream:
binders:
defaultbinder:
type: kafka
environment:
spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
kafka1:
type: kafka
environment:
spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
kafka:
bindings:
consumer:
binder: kafka1
destination: some-topic
group: testconsumergroup
consumer:
concurrency: 1
valueSerde: JsonSerde
producer:
binder: defaultbinder
destination: some-topic
producer:
valueSerde: JsonSerde
kafka:
binder:
consumer-properties: {enable.auto.commit: true}
auto-create-topics: false
brokers: <cluster1-brokers>
但是他们两个似乎都不起作用。第一个配置似乎无效。对于第二个配置,我收到以下错误
Caused by: java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : kafka1,defaultbinder, and no default binder has been set.
我正在使用依赖项 'org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.1.RELEASE' 和 Spring Boot 2.2.6
请让我知道如何使用 Spring Cloud Stream 为 Kafka 配置多个绑定
更新
在下面尝试了此配置
spring:
cloud:
stream:
binders:
kafka2:
type: kafka
environment:
spring.cloud.stream.kafka.binder.brokers: <cluster2-brokers>
kafka1:
type: kafka
environment:
spring.cloud.stream.kafka.binder.brokers: <cluster1-brokers>
bindings:
consumer:
destination: <some-topic>
binder: kafka1
group: testconsumergroup
content-type: application/json
nativeEncoding: true
consumer:
concurrency: 1
valueSerde: JsonSerde
producer:
destination: some-topic
binder: kafka2
contentType: application/json
nativeEncoding: true
producer:
valueSerde: JsonSerde
消息流和 EventHubBinding 如下
public interface MessageStreams {
String PRODUCER = "producer";
String CONSUMER = "consumer;
@Output(PRODUCER)
MessageChannel producerChannel();
@Input(CONSUMER)
SubscribableChannel consumerChannel()
}
@EnableBinding(MessageStreams.class)
public class EventHubStreamsConfiguration {
}
我的 Producer 类如下所示
@Component
@Slf4j
public class EventPublisher {
private final MessageStreams messageStreams;
public EventPublisher(MessageStreams messageStreams) {
this.messageStreams = messageStreams;
}
public boolean publish(CustomMessage event) {
MessageChannel messageChannel = getChannel();
MessageBuilder messageBuilder = MessageBuilder.withPayload(event);
boolean messageSent = messageChannel.send(messageBuilder.build());
return messageSent;
}
protected MessageChannel getChannel() {
return messageStreams.producerChannel();
}
}
消费者类如下所示
@Component
@Slf4j
public class EventHandler {
private final MessageStreams messageStreams;
public EventHandler(MessageStreams messageStreams) {
this.messageStreams = messageStreams;
}
@StreamListener(MessageStreams.CONSUMER)
public void handleEvent(Message<CustomMessage> message) throws Exception
{
// process the event
}
@Override
@ServiceActivator(inputChannel = "some-topic.testconsumergroup.errors")
protected void handleError(ErrorMessage errorMessage) throws Exception {
// handle error;
}
}
尝试发布和使用测试中的消息时出现以下错误。
Dispatcher has no subscribers for channel 'application.producer'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[104], headers={contentType=application/json, timestamp=1593517340422}]
我错过了什么吗?对于单个集群,我能够发布和使用消息。该问题仅发生在多个集群绑定中