2

如何在不更改转换存储库中的语义版本的情况下强制特定数据集以非增量方式构建?

有关我们特定用例的详细信息:

我们有大约 50 个数据集,由单个增量 python 通过手动注册和 for 循环定义。此转换的输入可以是 100 到 10000 个小 gzip 文件,因此当运行较大的数据集时,它最终会将所有这些分区为少数几个大小合适的 parquet 文件,这对于我们的下游工作来说是完美的。但是,在此作业已增量运行数月后(文件每小时到达),输出中还会有大量小型 parquet 文件。我们希望能够强制对单个数据集进行快照构建,而不必改变转换的语义版本,这将触发所有 50 个数据集的快照构建。这可能吗?

我了解一种潜在的解决方法可能是在转换本身中定义“最大输出文件”,读取现有输出中的当前文件数,并在当前超过最大值时强制执行快照。然而,由于这个管道是时间敏感的(需要在一个小时内运行),这会给管道带来一定程度的不可预测性,因为快照构建需要更长的时间。我们希望能够将这些完整的快照构建设置为每月在周末运行一次。

4

4 回答 4

3

在输出数据集上提交一个空的追加事务。

于 2020-09-24T21:40:07.197 回答
1

我认为您只需在运行时决定是否TransformOutput.set_mode()在输出中使用replacemodify. 这样,您可以根据输入的大小来决定是否要覆盖或附加到输出

于 2020-09-25T13:46:26.400 回答
1

这些天我首选的方法是使用我所说的“快照数据集”。这种方法允许您在任意点将快照事务注入管道,以及定期安排快照构建,这对于保持长期低延迟管道的性能非常有用。

为此,我在声明我的转换时使用了一个包装器(在我的例子中是 java 转换,但它同样适用于 python),它为我的转换添加了一个额外的输入。

假设您从读取数据集并生成数据集的A转换B开始C。包装器将插入一个名为 的附加数据集作为输入CSnapshotDataset,并生成一个生成此(空)数据集的转换。

生成的自动生成的转换将始终在构建数据集时CSnapshotDataset将空事务放入数据集中。SNAPSHOT当有来自 的新快照事务时CSnapshotDataset,您的转换也会输出一个快照事务。

然后,要从给定点开始对您的管道进行快照,例如从并包括数据集C,您只需选择Cs 快照数据集(CSnapshotDataset在本例中)并构建它。管道的下一次(计划的)运行将快照C及其下游的所有内容。

要定期运行它,您可以设置一个时间表来构建CSnapshotDataset

我慷慨地应用这个包装器(通常用于我编写的任何转换),这使我可以灵活地从可能需要它的任何数据集中对管道进行快照。

虽然设置它需要更多的前期工作,但它的主要优点是:

  • 只需单击一下即可启动快照,单击几下即可设置计划的快照,而不必进行多次curl调用
  • 它保持输入和输出数据集的交易历史干净
  • 它完全发生在平台内,无需使用命令行、jenkins 或类似工具提取令牌
于 2020-10-12T11:50:54.213 回答
0

我想你可以

对于输入: input = input.dataframe('current')

对于输出: output.set_mode('replace')

于 2020-09-25T08:03:06.620 回答