更新:我误解了 OP 的问题,从“如何检查拓扑是否已完成将输入主题具体化到状态存储”到“状态存储恢复过程”
REBALANCING
只有当 KafkaStreams 的状态从 更改为状态时,您才能从 KafkaStreams 实例中获取 KeyValueStore RUNNING
。您可以使用 a 来检查此状态转换,StreamsBuilderFactoryBeanCustomizer
以访问底层 KafkaStreams 实例。如果您只想检查所有状态存储何时已完全填充以及 kafka 流线程何时准备就绪,那么您可以获得一个KeyValueStore
您可以监听的StateListener
:
@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
// set flag that `stateStore` store of current KafkaStreams has been fully restore
// then you can get
}
}
}
或者如果您想从KafkaStreams
实例中获取商店
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
//get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
//and set flag that `stateStore` store of current KafkaStreams has been fully restore
}
});
});
}
在文档中阅读更多内容。
请注意,应该只有一个 StreamsBuilderFactoryBeanCustomizer 实例。