0

我在 Java 应用程序(Spring Cloud Stream)中使用 Kafka Streams API。我有一个特殊的用例,如下:

  • 我的应用程序将从主题 A 消费,并从主题 B 产生和消费。
  • 对于主题 A 上的每条消息,都会为主题 B 生成一组对应的消息,应用程序使用这些消息来跟踪内部状态变化。它使用来自主题 B 的 KStream 来将该状态具体化为可查询的存储。

由于应用程序的多个实例将运行,并且无法保证将任一主题的哪些特定分区分配给实例,因此必须在应用程序之间共享状态存储。否则,如果主题 B 发生重新平衡,则应用程序实例可能会丢失它们正在跟踪的主题 A 消息的状态信息。考虑以下场景:

  • 实例 1 具有主题 A 的分区 1 和主题 B 的分区 1。
  • 发生主题 B 的分区重新平衡。
  • 实例 1 现在具有主题 A 的分区 1(未更改),但具有主题 B 的分区 2。
  • 实例 1 现在无法访问它在为主题 B 拥有分区 1 时创建的状态存储中的数据。

如果仅针对主题 A 进行再平衡,则会发生相同的情况。

是否有可能实现为“全球状态存储”?我了解 GlobalKTable 的概念,但我需要使用 KStream 抽象,因为我需要访问完整的事件流。作为参考,我的 KStream 消费者如下:

    @StreamListener(INPUT_TOPIC)
    public void consumeKStream(KStream<String, Pojo> kStream) {
        kStream.groupByKey(Serialized.with(keySerde, valueSerde)).aggregate(HashMap::new, (key, value, map) -> {
            map.put(value.getFoo(), value.getBar()); return map;
        }, Materialized.<String, Map<Foo, Bar>, KeyValueStore<Bytes, byte[]>>as(STATE_STORE_NAME)
                .withKeySerde(keySerde).withValueSerde(valueMapSerde));
    }
4

1 回答 1

0

如果您从主题 A 和主题 B 中读取数据,并且您拥有从主题 B 实现数据并在存储中查找主题 A 记录的拓扑,那么您将保证实例获得共同分区的分配。因此,您描述的场景永远不会发生。

您可以通过检查包含子拓扑的您Topology(通过)来验证这一点。describe()子拓扑作为任务执行,并且任务具有保证的共同分区输入主题分配。

比较:https ://docs.confluent.io/current/streams/architecture.html#parallelism-model

于 2020-02-23T08:46:25.373 回答