让我以一种通用的方式开始,看看我是否遗漏了一些概念:我有一个流式 flink 作业,我从中创建了一个保存点。这项工作的简化版本如下所示
伪代码:
val flink = StreamExecutionEnvironment.getExecutionEnvironment
val stream = if (batchMode) {
flink.readFile(path)
}
else {
flink.addKafkaSource(topicName)
}
stream.keyBy(key)
stream.process(new ProcessorWithKeyedState())
CassandraSink.addSink(stream)
只要我在没有保存点的情况下运行作业,它就可以正常工作。如果我从保存点开始工作,我会得到一个看起来像这样的异常
Caused by: java.lang.UnsupportedOperationException: Checkpoints are not supported in a single key state backend
at org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623)
at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)
如果我设置选项,我可以解决这个问题:
execution.batch-state-backend.enabled: false
但这最终会导致另一个错误:
Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673)
at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526)
当然,我尝试设置配置键taskmanager.memory.managed.consumer-weights
(used DATAPROC:70,PYTHON:30
),但这似乎没有任何效果。
所以我想知道我是否有概念错误并且无法在批处理作业中重用流作业中的保存点,或者我的配置是否存在问题。有什么提示吗?