文档非常直接,建议公开 KafkaBindingRebalanceListener 类型的 Bean,并且将在内部调用 onPartitiosnAssigned 方法。我正在尝试做同样的事情,并且在 spring 框架创建其 KafkaMessageChannelBinder Bean 时,ObjectProvider.getIfUnique() 总是返回 null,因为它无法找到所需的 bean。似乎当应用程序启动 SpringFramework 时首先创建它的 Beans 并且由于它尚未创建而无法找到 Rebalance Listener Bean。以下是项目中的三个代码片段。如果我缺少任何指示应用程序在进入 Spring Framework 之前首先在应用程序包中创建 Bean 的内容,请提供帮助。
再平衡监听器
package io.spring.dataflow.sample.seekoffset.config;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener;
import org.springframework.stereotype.Component;
import java.util.Collection;
@Component
public class KafkaRebalanceListener implements KafkaBindingRebalanceListener {
Logger logger = LoggerFactory.getLogger(SeekOffsetConfig.class);
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
logger.debug("onPartitionsAssigned");
}
}
配置类
package io.spring.dataflow.sample.seekoffset.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
@EnableBinding(Sink.class)
public class SeekOffsetConfig {
Logger logger = LoggerFactory.getLogger(SeekOffsetConfig.class);
@StreamListener(Sink.INPUT)
public void receiveMessage(Message<String> message) {
logger.debug("receiveMessage()");
}
}
应用类
package io.spring.dataflow.sample.seekoffset;
import io.spring.dataflow.sample.seekoffset.config.KafkaRebalanceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
public class SeekOffsetApplication {
Logger logger = LoggerFactory.getLogger(SeekOffsetApplication.class);
public static void main(String[] args) {
SpringApplication.run(SeekOffsetApplication.class, args);
}
}