这是一种可以从分桶中受益的常见模式。
其要点是:根据您的列将输出 SNAPSHOT 写入存储桶,其中完全跳过primary_key
了对更大输出进行改组的昂贵步骤。
这意味着您只需要将新数据交换到已经包含您先前历史记录的存储桶中。
让我们从初始状态开始,我们在之前计算的“最新”版本上运行,该版本是一个缓慢的 SNAPSHOT:
- output: raw_dataset
input: external_jdbc_system
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- APPEND
- APPEND
- output: clean_dataset
input: raw_dataset
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- SNAPSHOT
- SNAPSHOT
如果我们clean_dataset
在列上使用分桶写入primary_key
单独计算的桶数以适应我们预期的数据规模,我们将需要以下代码:
from transforms.api import transform, Input, Output
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
@transform(
my_output=Output("/datasets/clean_dataset"),
my_input=Input("/datasets/raw_dataset")
)
def my_compute_function(my_input, my_output):
BUCKET_COUNT = 600
PRIMARY_KEY = "primary_key"
ORDER_COL = "update_ts"
updated_keys = my_input.dataframe("added")
last_written = my_output.dataframe("current")
updated_keys.repartition(BUCKET_COUNT, PRIMARY_KEY)
value_cols = [x for x in last_written.columns if x != PRIMARY_KEY]
updated_keys = updated_keys.select(
PRIMARY_KEY,
*[F.col(x).alias("updated_keys_" + x) for x in value_cols]
)
last_written = last_written.select(
PRIMARY_KEY,
*[F.col(x).alias("last_written_" + x) for x in value_cols]
)
all_rows = updated_keys.join(last_written, PRIMARY_KEY, "fullouter")
latest_df = all_rows.select(
PRIMARY_KEY,
*[F.coalesce(
F.col("updated_keys_" + x),
F.col("last_written_" + x)
).alias(x) for x in value_cols]
)
my_output.set_mode("replace")
return my_output.write_dataframe(
latest_df,
bucket_cols=PRIMARY_KEY,
bucket_count=BUCKET_COUNT,
sort_by=ORDER_COL
)
当它运行时,您会在查询计划中注意到,项目在输出上的单步不再包含 exchange,这意味着它不会对数据进行混洗。您现在将看到的唯一交换是在输入上,它需要以与输出格式完全相同的方式分发更改(这是一个非常快速的操作)。
然后将此交换保留到fullouter
连接步骤中,然后连接将利用此交换并非常快速地运行 600 个任务。最后,我们通过在与以前相同的列上显式地分桶到相同数量的桶中来维护输出的格式。
注意:使用这种方法,您在每个存储桶中的文件大小会随着时间的推移而增长,并且不会考虑增加存储桶数量以保持大小合适的需要。您最终将使用这种技术达到一个阈值,文件大小超过 128MB,并且您不再有效地执行(修复是提高BUCKET_COUNT
值)。
您的输出现在将如下所示:
- output: raw_dataset
input: external_jdbc_system
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- APPEND
- APPEND
- output: clean_dataset
input: raw_dataset
hive_partitioning: none
bucketing: BUCKET_COUNT by PRIMARY_KEY
transactions:
- SNAPSHOT
- SNAPSHOT
- SNAPSHOT