是否有一个示例说明如何使用 Spring Cloud 流和使用函数方法创建 GlobalKTable 以保持 KStream 的计数?
2 回答
0
根据文档GlobalKTables 是只读的,您不能在处理过程中修改全局表。
由于 GlobalKTables 是 Kafka 主题的消费者,因此您可以将数据发送到 GlobalKTable 的源主题,最终将其添加到表中。但是您不能确定 GlobalKTable 是否会立即更新。
于 2020-07-28T15:08:38.347 回答
0
实现处理器接口是正确的方法吗?
@Bean
public Consumer<KStream<String, Long>> processorsample() {
return input -> input.process(() -> new Processor<String, Long>() {
@Override
public void init(ProcessorContext context) {
if (state == null) {
state = (KeyValueStore<String, Long>) context.getStateStore("mystate");
}
}
@Override
public void process(String key, Long value) {
if (state != null) {
if (key != null) {
Long currentCount = state.get(key);
if (currentCount == null) {
state.put(key, value);
} else {
state.put(key, currentCount + value);
}
}
}
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "mystate");
}
于 2020-03-29T13:50:05.437 回答