TL;DR:我听说某些 PySpark 函数在 Transforms 中是不可取的,但我不确定哪些函数是错误的,为什么会这样?
为什么我不能collect()
在某些情况下将我的数据放到一个列表中并遍历行?
TL;DR:我听说某些 PySpark 函数在 Transforms 中是不可取的,但我不确定哪些函数是错误的,为什么会这样?
为什么我不能collect()
在某些情况下将我的数据放到一个列表中并遍历行?
这里有很多部分需要理解才能得出最终结论,即这些collect()
功能和其他功能是 Spark 的低效使用。
首先,让我们介绍一下本地计算与分布式计算之间的区别。在 Spark 中,您通常执行的pyspark.sql.functions
和pyspark.sql.DataFrame
操作,例如join()
,或groupBy()
会将这些操作的执行委托给底层 Spark 库,以获得最大可能的性能。可以将这视为简单地将 Python 用作 SQL 之上的一种更方便的语言,您可以在其中懒惰地描述您希望 Spark 为您执行的操作。
这样,当您坚持使用 PySpark 中的 SQL 操作时,您可以获得高度可扩展的性能,但仅限于您可以用 SQL 表达的东西。这就是人们通常可以采取懒惰的方法并使用for
循环来实现他们的转换而不是考虑最佳策略的地方。
让我们考虑一下您只想将单个值添加到 DataFrame 中的整数列的情况。你会在 Stack Overflow 和其他地方找到很多例子,在一些更微妙的情况下,他们建议使用collect()
将数据带入 Python 列表,遍历每一行,并在完成后将数据推回 DataFrame,这是您可以在此处执行的一种策略。但是,让我们考虑一下它在实践中的含义:您将托管在 Spark 中的数据带回构建的驱动程序,以便使用 Python 中的单个线程在每一行上循环,并为每一行添加一个常量值一次。如果我们发现(在这种情况下很明显)与此操作等效的 SQL,Spark 可以获取您的数据并以大规模并行方式将值添加到各个行。也就是说,如果您有 64 个执行者(可用于完成工作的工作人员实例),那么您将有 64 个“核心”(这不是一个完美的类比,但很接近)来将数据拆分并发送到每个用于将值添加到列中。
在驱动程序上工作是我所说的“本地”计算,在执行程序中工作是“并行”。
这可能是一个明显的例子,但在处理更困难的转换(例如高级窗口操作或线性代数计算)时,通常很难记住这种差异。Spark 具有可用于以分布式方式进行矩阵乘法和操作的库,以及在 Windows 上需要更多考虑您的问题的一些非常高级的操作。
使用 PySpark 最有效的方法是发送关于如何一次构建 DataFrame 的“指令”,以便 Spark 可以找出实现这些数据的最佳方法。这样,如果可能的话,应该避免强制计算 DataFrame 以便您可以在代码中的某个位置对其进行检查的函数;他们的意思是 Spark 正在额外工作以满足您的print()
语句或其他方法调用,而不是努力写出您的数据。
Python 运行时实际上是在 JVM 中执行的,而 JVM 又与 Spark 运行时通信,后者是用 Scala 编写的。因此,对于collect()
您希望在 Python 中具体化数据的每次调用,Spark 必须将您的数据具体化为单个本地可用的 DataFrame,然后将其从 Scala 合成为其 Java 等价物,最后从 JVM 传递到 Python 等价物在它可以迭代之前。这是一个非常低效的过程,无法并行化。这导致将数据呈现给 Python 的操作非常建议避免.
那么,应该避免哪些功能呢?
这些方法中的每一个都将强制在 DataFrame 上执行并将结果带回 Python 运行时以供显示/使用。这意味着 Spark 将没有机会懒惰地找出计算数据的最有效方法,而是被迫在继续执行任何其他操作之前带回请求的数据。