0

让我以一种通用的方式开始,看看我是否遗漏了一些概念:我有一个流式 flink 作业,我从中创建了一个保存点。这项工作的简化版本如下所示

伪代码:

val flink = StreamExecutionEnvironment.getExecutionEnvironment
val stream = if (batchMode) {
  flink.readFile(path)
}
else {
  flink.addKafkaSource(topicName)
} 

stream.keyBy(key)
stream.process(new ProcessorWithKeyedState())

CassandraSink.addSink(stream)

只要我在没有保存点的情况下运行作业,它就可以正常工作。如果我从保存点开始工作,我会得到一个看起来像这样的异常

Caused by: java.lang.UnsupportedOperationException: Checkpoints are not supported in a single key state backend
    at org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623)
    at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)

如果我设置选项,我可以解决这个问题:

execution.batch-state-backend.enabled: false

但这最终会导致另一个错误:

Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673)
at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526)

当然,我尝试设置配置键taskmanager.memory.managed.consumer-weights(used DATAPROC:70,PYTHON:30),但这似乎没有任何效果。

所以我想知道我是否有概念错误并且无法在批处理作业中重用流作业中的保存点,或者我的配置是否存在问题。有什么提示吗?

4

1 回答 1

0

在来自 flink 用户组的提示后,事实证明无法从流式作业中重用保存点(https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/数据流/execution_mode/#state-backends--state)。因此,我没有像批处理模式 ( flink.setRuntimeMode(RuntimeExecutionMode.BATCH)) 那样运行作业,而是在默认执行模式 ( STREAMING) 下运行它。这有一个小缺点,它将永远运行,并且在处理完所有数据后必须由某人停止。

于 2021-08-12T05:35:54.123 回答