1

我有一些 PySpark 代码正在编写,我想在其中执行连接和其他操作,但我想在此阶段成功完成时记录下来。

为什么我没有看到按我期望的顺序记录?即使我的工作仍在继续工作,似乎一切都立即出现了……

4

1 回答 1

2

在描述 PySpark 将要执行的工作时,了解 PySpark 使用的不同模型非常重要。

PySpark 在其查询评估中基本上是惰性的,并且会等待执行您请求的任何工作,直到绝对必要。

这意味着即使您描述了一个连接,记录一些事情,然后继续另一个连接,第一个连接实际上还没有被执行。这是因为在正常情况下,它实际上不会开始做任何事情,直到您write_dataframe在转换的最后调用该方法。

一些例外情况是.count(), .first(),.take()以及任何强制 Spark 评估传入 DataFrame 并将其结果返回给您的内容。这意味着它将被迫在 之前实际评估查询.count(),并在它在您的 Python 代码中进一步执行之前将其结果返回给您。

这就是为什么出于性能原因,在代码中使用诸如此类的方法是一种反模式,因为它们可能不会直接影响您的最终数据集构建;他们正在具体化一个可能不会导致您的输出的摘要。

对于一个具体的例子,让我们考虑以下内容:

my_input_df = ...
my_other_df = ...

my_joined_df = my_input_df.join(my_other_df, on="my_joint_col", how="inner")
print("I joined!")
my_enriched_df = my_joined_df.withColumn("my_other_column", F.lit(1))
my_output.write_dataframe(my_enriched_df)

I joined!将在您的工作开始时打印到控制台,并且您的工作将继续执行物化joinwithColumn就好像什么都没发生一样。 这是因为 Python 不会阻止您的 print 语句的主线程,因为它不会强制评估您的 DataFrame

但是,如果我将代码更改为以下内容:

my_input_df = ...
my_other_df = ...

my_joined_df = my_input_df.join(my_other_df, on="my_joint_col", how="inner")
print("I joined {0} rows!".format(my_joined_df.count())
my_enriched_df = my_joined_df.withColumn("my_other_column", F.lit(1))
my_output.write_dataframe(my_enriched_df)

然后我会I joined X rows!在日志中观察,我的工作会停止执行以实现count之前的工作。这将意味着在我的整体构建中执行速度较慢,并且很可能会重新实现我们之前所做的工作。这就是为什么在代码存储库中编写代码时,您会经常注意到这些方法的警告,这些方法会停止执行输出。

我们通常鼓励用户编写鼓励对 DataFrame 进行惰性评估的代码,并避免停止工作以将内容打印到控制台。日志可能会超出您预期的顺序或降低您的计算速度。

于 2021-12-16T19:31:57.987 回答