我正在测试 NiFi 以替换我们当前的摄取设置,该设置从表的多个 MySQL 分片导入数据并将其存储在 HDFS 中。
我正在使用GenerateTableFetch
并ExecuteSQL
实现这一目标。
每个传入的流文件都有一个database.name
属性,用于DBCPConnectionPoolLookup
选择相关的分片。
问题是,假设我有 2 个分片要从中提取数据,shard_1
对于shard_2
表帐户,我也有updated_at
as Maximum Value Columns
,它没有为table@updated_at
每个分片存储状态。状态中的每个表只有 1 个条目。
当我检查 Data Provenance 时,我看到 shard_2 流文件文件在没有传递给 ExecuteSQL 的情况下被删除。我的猜测是,这是因为 shard_1 查询首先执行,然后当 shard_2 查询到来时,它的记录会与 shard_1 的 updated_at 进行检查,并且由于它返回空,它会删除文件。
有没有人遇到过这个问题?还是我错过了什么?