3

我有一个紧凑的主题,大约有 30 个 Mio 键。我App将此主题具体化为KeyValueStore.

如何检查是否KeyValueStore已完全填充?如果我通过查找密钥,InteractiveQuery我需要知道密钥是否不存在,因为它还StateStore没有准备好,或者密钥是否确实不存在。

我以这种方式实现 StateStore:


  @Bean
  public Consumer<KTable<Key, Value>> process() {
    return stream -> stream.filter((k, v) -> v != null,
        Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
            .withKeySerde(new KeySerde())
            .withValueSerde(new ValueSerde()));
  }
4

2 回答 2

3

一般来说,不存在“完全加载”这样的事情,因为在应用程序启动后的任何时间点,新数据都可能写入输入主题,并且将读取这些新数据以更新相应的表。

您可以做的是监控消费者滞后:在您的应用程序中KafkaStreams#metrics(),您可以访问所有客户端(即消费者/生产者)和 Kafka Streams 指标。消费者公开了一个records-lag-max可能有帮助的指标。

当然,在正常处理过程中(假设一直在向输入主题写入新数据)消费者延迟会一直上下波动。

于 2020-03-13T09:42:07.177 回答
-1

更新:我误解了 OP 的问题,从“如何检查拓扑是否已完成将输入主题具体化到状态存储”到“状态存储恢复过程”

REBALANCING只有当 KafkaStreams 的状态从 更改为状态时,您才能从 KafkaStreams 实例中获取 KeyValueStore RUNNING。您可以使用 a 来检查此状态转换,StreamsBuilderFactoryBeanCustomizer以访问底层 KafkaStreams 实例。如果您只想检查所有状态存储何时已完全填充以及 kafka 流线程何时准备就绪,那么您可以获得一个KeyValueStore您可以监听的StateListener

@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
    return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
        if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
            // set flag that `stateStore` store of current KafkaStreams has been fully restore
            // then you can get
        }
    }
}

或者如果您想从KafkaStreams实例中获取商店

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
        kafkaStreams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                //get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
                //and set flag that `stateStore` store of current KafkaStreams has been fully restore
            }
        });
    });
}

在文档中阅读更多内容

请注意,应该只有一个 StreamsBuilderFactoryBeanCustomizer 实例。

于 2020-03-12T09:52:58.643 回答