1

我正在尝试处理大量数据流(源 = Kinesis 流)并沉入 Postgres DB。这样做时,我需要首先将传入流与一些已经存在于 Postgres DB 中的主数据连接起来。

我正在从传入的 kinesis 流创建一个键控流,并使用 JDBC 目录使用 Flink 表 API 创建第二个流。我已按如下方式设置了我的数据库接收器:

  public class PosgresSink extends RichSinkFunction <Clazz> implements CheckpointedFunction, CheckpointListener { .. }

这样每次 Flinks 进行检查点接收器时都会被触发。

但是,当我与来自 JDBC 源的传入流进行连接时,我得到以下信息:

   org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source TableSourceScan(table ...
    ...
   (1/4) of job 3961ccdf0d2a7da504f61e094a74fa5f is not in state RUNNING but FINISHED instead. Aborting checkpoint

这阻塞了我的接收器,因为检查点每次都会中止。

看来我的 JDBC 源代码很早就完成了,当 Flink 尝试检查点时,它没有找到任何正在运行的作业并中止检查点。Flink 似乎有一个限制,它仅在所有操作员/任务仍在运行时才检查点

https://issues.apache.org/jira/browse/FLINK-2491

我正在按如下方式设置我的 JDBC 流:

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

// Register a JDBC catalog 

static JdbcCatalog registerJdbcCatalog(StreamTableEnvironment bsTableEnv)   {
   String name = "<>";
   String defaultDatabase = "<>";
   String username = "<>";
   String password = "<>";
   String baseUrl = "jdbc:postgresql://localhost:5432/";

   JdbcCatalog jdbcCatalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
   bsTableEnv.registerCatalog("catalogName", jdbcCatalog);
   bsTableEnv.useCatalog("catalogName");
   return jdbcCatalog;
 }

// get the table
Table table= bsTableEnv.from("table")

// create a data stream from table
DataStream<Table> myStream= bsTableEnv.toAppendStream(table, Table.class);

这是正确的理解吗?这里有办法吗?

4

0 回答 0