2

我的用例是这样的。我有一些 X 表要从 MySQL 中提取。我正在拆分它们SplitText以将每个表放在一个单独的流文件中并使用GenerateTableFetchand拉取ExecuteSQL

当所有表的导入完成时,我希望得到通知或采取其他措施。在SplitText文本处理器中,我已将original关系路由到Waiton ${filename}with target count ${fragment.count}。这将跟踪完成了多少张表。

但是现在我无法弄清楚如何知道特定表何时完成。GenerateTableFetch根据分区大小将流文件分成多个。但它没有写入像 fragment.count 这样的属性,我可以用它来等待每个表。

有没有办法可以做到这一点?或者,是否有办法在整个流程结束时知道流程中的所有流程文件是否都已处理,并且队列中没有任何内容或正在处理?

4

2 回答 2

3

如果您有一个独立的 NiFi 实例(或者没有在集群之间将流文件分发到 ExecuteSQL 节点),那么您可以改用 QueryDatabaseTable,它(默认情况下)只会在处理整个结果集时发布所有流文件。如果您将所有行放入单个流文件中,则流文件已向下游传输的事实表明获取已完成。

我编写了 NIFI-5601来涵盖向 GTF 生成的流文件添加 fragment.* 属性的改进。

于 2018-09-15T17:36:16.327 回答
2

直到 NiFi 添加对此的支持,我设法使用MergeContent. 使用 table_name 作为Correlation attribute name,然后使用mergedWait处理器的关系${merge.count}作为目标。如果有人想做同样的事情,请参考屏幕截图。

在此处输入图像描述

合并内容处理器

等待处理器

于 2018-09-18T18:43:01.623 回答