我在 Java 应用程序(Spring Cloud Stream)中使用 Kafka Streams API。我有一个特殊的用例,如下:
- 我的应用程序将从主题 A 消费,并从主题 B 产生和消费。
- 对于主题 A 上的每条消息,都会为主题 B 生成一组对应的消息,应用程序使用这些消息来跟踪内部状态变化。它使用来自主题 B 的 KStream 来将该状态具体化为可查询的存储。
由于应用程序的多个实例将运行,并且无法保证将任一主题的哪些特定分区分配给实例,因此必须在应用程序之间共享状态存储。否则,如果主题 B 发生重新平衡,则应用程序实例可能会丢失它们正在跟踪的主题 A 消息的状态信息。考虑以下场景:
- 实例 1 具有主题 A 的分区 1 和主题 B 的分区 1。
- 发生主题 B 的分区重新平衡。
- 实例 1 现在具有主题 A 的分区 1(未更改),但具有主题 B 的分区 2。
- 实例 1 现在无法访问它在为主题 B 拥有分区 1 时创建的状态存储中的数据。
如果仅针对主题 A 进行再平衡,则会发生相同的情况。
是否有可能实现为“全球状态存储”?我了解 GlobalKTable 的概念,但我需要使用 KStream 抽象,因为我需要访问完整的事件流。作为参考,我的 KStream 消费者如下:
@StreamListener(INPUT_TOPIC)
public void consumeKStream(KStream<String, Pojo> kStream) {
kStream.groupByKey(Serialized.with(keySerde, valueSerde)).aggregate(HashMap::new, (key, value, map) -> {
map.put(value.getFoo(), value.getBar()); return map;
}, Materialized.<String, Map<Foo, Bar>, KeyValueStore<Bytes, byte[]>>as(STATE_STORE_NAME)
.withKeySerde(keySerde).withValueSerde(valueMapSerde));
}