3

在阅读了几篇关于 Spark 数据集的精彩文章(thisthisthis)之后,我总结了下一个 DataSet 相对于 RDD 的性能优势:

  1. 逻辑和物理计划优化;
  2. 严格的类型化;
  3. 矢量化操作;
  4. 低级内存管理。

问题:

  1. Spark 的 RDD 还构建了物理计划,可以在同一阶段组合/优化多个转换。那么DataSet 相对于 RDD 有什么好处呢?
  2. 第一个链接你可以看到一个例子RDD[Person]DataSet 有高级类型化吗?
  3. “矢量化操作”是什么意思?
  4. 据我了解,DataSet 的低内存管理 = 高级序列化。这意味着可序列化对象的堆外存储,您可以在其中仅读取对象的一个​​字段而无需反序列化。但是当你有IN_MEMORY_ONLY持久性策略的情况下呢?无论如何,DataSet 会序列化所有内容吗?它会比 RDD 有任何性能优势吗?
4

1 回答 1

6

Spark 的 RDD 还构建了物理计划,可以在同一阶段组合/优化多个转换。DataSet 比 RDD 有什么好处?

使用 RDD 时,所写即所得。虽然某些转换通过链接进行了优化,但执行计划是 DAG 的直接转换。例如:

rdd.mapPartitions(f).mapPartitions(g).mapPartitions(h).shuffle()

其中shuffle是任意洗牌变换 ( *byKey, repartition, 等)。所有三个mapPartitions( map, flatMap, filter) 将被链接起来而不创建中间对象但不能重新排列。

与之相比,Datasets使用限制性更强的编程模型,但可以使用多种技术优化执行,包括:

  • 选择 ( filter) 下推。例如,如果您有:

    df.withColumn("foo", col("bar") + 1).where(col("bar").isNotNull())
    

    可以执行为:

    df.where(col("bar").isNotNull()).withColumn("foo", col("bar") + 1)
    
  • 早期预测 ( select) 和消除。例如:

    df.withColumn("foo", col("bar") + 1).select("foo", "bar")
    

    可以改写为:

    df.select("foo", "bar").withColumn("foo", col("bar") + 1)
    

    以避免获取和传递过时的数据。在极端情况下,它可以完全消除特定的变换:

    df.withColumn("foo", col("bar") + 1).select("bar")
    

    可以优化为

    df.select("bar")
    

这些优化是可能的,原因有两个:

  • 限制性数据模型,无需复杂且不可靠的静态代码分析即可进行依赖性分析。
  • 清晰的运算符语义。运算符没有副作用,我们清楚地区分确定性和非确定性。

为了清楚起见,假设我们有以下数据模型:

case class Person(name: String, surname: String, age: Int)

val people: RDD[Person] = ???

我们要检索所有 21 岁以上的人的姓氏。用RDD它可以表示为:

people
  .map(p => (p.surname, p.age))          // f
  .filter { case (_, age) => age > 21 }  // g

现在让我们问自己几个问题:

  • age输入infage变量 with之间有什么关系g
  • 然后和f然后g一样吗?gf
  • fg副作用免费吗?

虽然答案对于人类读者来说是显而易见的,但对于假设的优化器来说却不是。与Dataframe版本相比:

people.toDF
  .select(col("surname"), col("age"))    // f'
  .where(col("age") > 21)                // g'

对于优化器和人类读者来说,答案都很清楚。

当使用静态类型DatasetsSpark 2.0 Dataset vs DataFrame)时,这会产生一些进一步的后果。

DataSet 有更高级的类型化吗?

  • 不 - 如果您关心优化。最高级的优化仅限于Dataset[Row]并且目前无法对复杂类型层次结构进行编码。
  • 也许 - 如果您接受 Kryo 或 Java 编码器的开销。

“矢量化操作”是什么意思?

在优化的上下文中,我们通常指的是循环矢量化/循环展开。Spark SQL 使用代码生成来创建高级转换的编译器友好版本,可以进一步优化以利用矢量化指令集。

据我了解,DataSet 的低内存管理 = 高级序列化。

不完全是。使用本机分配的最大优点是逃避垃圾收集器循环。由于垃圾收集通常是 Spark 中的一个限制因素,因此这是一个巨大的改进,尤其是在需要大型数据结构的上下文中(例如准备 shuffle)。

另一个重要方面是列式存储,它可以实现有效的压缩(可能会降低内存占用)并优化压缩数据的操作。

一般来说,您可以使用纯手工制作的代码应用完全相同类型的优化RDDs。毕竟Datasets都有后盾RDDs。不同的只是需要付出多少努力。

  • 手工制作的执行计划优化相对容易实现。
  • 使代码编译器友好需要一些更深入的知识,并且容易出错且冗长。
  • 使用sun.misc.Unsafe本机内存分配不适合胆小的人。

尽管DatasetAPI 有其所有优点,但它并不是通用的。虽然某些类型的常见任务可以在许多情况下从其优化中受益,但与 RDD 等效项相比,您可能没有任何改进甚至性能下降。

于 2016-12-26T16:50:27.217 回答