我正在尝试处理大量数据流(源 = Kinesis 流)并沉入 Postgres DB。这样做时,我需要首先将传入流与一些已经存在于 Postgres DB 中的主数据连接起来。
我正在从传入的 kinesis 流创建一个键控流,并使用 JDBC 目录使用 Flink 表 API 创建第二个流。我已按如下方式设置了我的数据库接收器:
public class PosgresSink extends RichSinkFunction <Clazz> implements CheckpointedFunction, CheckpointListener { .. }
这样每次 Flinks 进行检查点接收器时都会被触发。
但是,当我与来自 JDBC 源的传入流进行连接时,我得到以下信息:
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint triggering task Source TableSourceScan(table ...
...
(1/4) of job 3961ccdf0d2a7da504f61e094a74fa5f is not in state RUNNING but FINISHED instead. Aborting checkpoint
这阻塞了我的接收器,因为检查点每次都会中止。
看来我的 JDBC 源代码很早就完成了,当 Flink 尝试检查点时,它没有找到任何正在运行的作业并中止检查点。Flink 似乎有一个限制,它仅在所有操作员/任务仍在运行时才检查点
https://issues.apache.org/jira/browse/FLINK-2491
我正在按如下方式设置我的 JDBC 流:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
// Register a JDBC catalog
static JdbcCatalog registerJdbcCatalog(StreamTableEnvironment bsTableEnv) {
String name = "<>";
String defaultDatabase = "<>";
String username = "<>";
String password = "<>";
String baseUrl = "jdbc:postgresql://localhost:5432/";
JdbcCatalog jdbcCatalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
bsTableEnv.registerCatalog("catalogName", jdbcCatalog);
bsTableEnv.useCatalog("catalogName");
return jdbcCatalog;
}
// get the table
Table table= bsTableEnv.from("table")
// create a data stream from table
DataStream<Table> myStream= bsTableEnv.toAppendStream(table, Table.class);
这是正确的理解吗?这里有办法吗?