我有一些 PySpark 代码正在编写,我想在其中执行连接和其他操作,但我想在此阶段成功完成时记录下来。
为什么我没有看到按我期望的顺序记录?即使我的工作仍在继续工作,似乎一切都立即出现了……
我有一些 PySpark 代码正在编写,我想在其中执行连接和其他操作,但我想在此阶段成功完成时记录下来。
为什么我没有看到按我期望的顺序记录?即使我的工作仍在继续工作,似乎一切都立即出现了……
在描述 PySpark 将要执行的工作时,了解 PySpark 使用的不同模型非常重要。
PySpark 在其查询评估中基本上是惰性的,并且会等待执行您请求的任何工作,直到绝对必要。
这意味着即使您描述了一个连接,记录一些事情,然后继续另一个连接,第一个连接实际上还没有被执行。这是因为在正常情况下,它实际上不会开始做任何事情,直到您write_dataframe
在转换的最后调用该方法。
一些例外情况是.count()
, .first()
,.take()
以及任何强制 Spark 评估传入 DataFrame 并将其结果返回给您的内容。这意味着它将被迫在 之前实际评估查询.count()
,并在它在您的 Python 代码中进一步执行之前将其结果返回给您。
这就是为什么出于性能原因,在代码中使用诸如此类的方法是一种反模式,因为它们可能不会直接影响您的最终数据集构建;他们正在具体化一个可能不会导致您的输出的摘要。
对于一个具体的例子,让我们考虑以下内容:
my_input_df = ...
my_other_df = ...
my_joined_df = my_input_df.join(my_other_df, on="my_joint_col", how="inner")
print("I joined!")
my_enriched_df = my_joined_df.withColumn("my_other_column", F.lit(1))
my_output.write_dataframe(my_enriched_df)
I joined!
将在您的工作开始时打印到控制台,并且您的工作将继续执行物化join
,withColumn
就好像什么都没发生一样。 这是因为 Python 不会阻止您的 print 语句的主线程,因为它不会强制评估您的 DataFrame。
但是,如果我将代码更改为以下内容:
my_input_df = ...
my_other_df = ...
my_joined_df = my_input_df.join(my_other_df, on="my_joint_col", how="inner")
print("I joined {0} rows!".format(my_joined_df.count())
my_enriched_df = my_joined_df.withColumn("my_other_column", F.lit(1))
my_output.write_dataframe(my_enriched_df)
然后我会I joined X rows!
在日志中观察,我的工作会停止执行以实现count
之前的工作。这将意味着在我的整体构建中执行速度较慢,并且很可能会重新实现我们之前所做的工作。这就是为什么在代码存储库中编写代码时,您会经常注意到这些方法的警告,这些方法会停止执行输出。
我们通常鼓励用户编写鼓励对 DataFrame 进行惰性评估的代码,并避免停止工作以将内容打印到控制台。日志可能会超出您预期的顺序或降低您的计算速度。