我正在使用处理器 api 从状态存储中删除消息。删除工作成功,我通过 kafka 密钥在状态存储上使用交互式查询调用确认,但它不会减少目录 tmp/kafka-streams 下本地磁盘上的 kafka 流文件大小。
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long l) {
processorContext.commit();
}
}); //invoke punctuate every 12 seconds
this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
log.info("Processor initialized");
}
@Override
public void process(String key, GenericRecord value) {
statestore.all().forEachRemaining(keyValue -> {
statestore.delete(keyValue.key);
});
}
kafka 流目录大小
2.3M /private/tmp/kafka-streams
3.3M /private/tmp/kafka-streams
我是否需要任何特定配置才能控制文件大小?如果这样不行,删除kafka-streams目录可以吗?我认为它应该是安全的,因为这样的删除将从状态存储和更改日志主题中删除记录。