1

我正在测试 NiFi 以替换我们当前的摄取设置,该设置从表的多个 MySQL 分片导入数据并将其存储在 HDFS 中。

我正在使用GenerateTableFetchExecuteSQL实现这一目标。

每个传入的流文件都有一个database.name属性,用于DBCPConnectionPoolLookup选择相关的分片。

问题是,假设我有 2 个分片要从中提取数据,shard_1对于shard_2表帐户,我也有updated_atas Maximum Value Columns,它没有为table@updated_at每个分片存储状态。状态中的每个表只有 1 个条目。

当我检查 Data Provenance 时,我看到 shard_2 流文件文件在没有传递给 ExecuteSQL 的情况下被删除。我的猜测是,这是因为 shard_1 查询首先执行,然后当 shard_2 查询到来时,它的记录会与 shard_1 的 updated_at 进行检查,并且由于它返回空,它会删除文件。

有没有人遇到过这个问题?还是我错过了什么?

4

1 回答 1

2

通过 DBCPConnectionPoolLookup 选择不同数据库的功能是在将状态存储在数据库获取处理器(例如,QueryDatabaseTable、GenerateTableFetch)的方案之后添加的。此外,获取数据库名称在 RDBMS 驱动程序之间有所不同,它可能在 DatabaseMetaData 或 ResultSetMetaData 中,可能在 getCatalog() 或 getSchema() 中,或者两者都没有。

我编写了 NIFI-5590来涵盖这一改进。

于 2018-09-12T16:51:39.857 回答