5

我有一个创建两个数据集的数据连接源:

  • 数据集 X(快照)
  • 数据集 Y(增量)

这两个数据集来自同一来源。数据集X包含源表中所有行的当前状态。数据集Y提取自上次构建以来已更新的所有行。然后将这两个数据集在下游合并到 datasetZ中, datasetZ是 datasetX或 dataset 中每一行的最新版本Y。这使我们既可以进行低延迟更新,又可以保持良好的分区。

在源表中删除行时,这些行不再存在于 dataset 中,X但仍存在于 dataset 中Y

将这些“已删除”行保留在数据集中的最佳方法是什么Z?理想情况下,我还可以在Y不丢失任何“已删除”行的情况下对数据集进行快照。

4

1 回答 1

4

好问题!据我了解,您希望数据集Z仅包含最新的行,包括最新的已删除行。更新的行和删除的行都存在于Y. 在这种情况下,我建议首先合并YX一起,以便所有行,包括已删除的行都存在于联合数据集中。然后,在日期列上使用窗口函数以获得每一行的最新版本。以下是我建议的 pyspark 代码大纲:

from pyspark.sql import Window
import pyspark.sql.functions as F

window = Window.partitionBy(primary_keys).orderBy(F.col(date_column).desc())
Z = X.unionByName(Y) # union to get all columns, including deleted
Z = Z.withColumn("row_num", F.row_number().over(window)) # rank by date created/updated
Z = Z.filter(F.col("row_num") == 1).drop("row_num") # keep only the latest version of each row

请注意,此解决方案无法解决 Y 快照会发生什么的问题。

于 2021-10-15T15:46:40.827 回答