Spark 的 RDD 还构建了物理计划,可以在同一阶段组合/优化多个转换。DataSet 比 RDD 有什么好处?
使用 RDD 时,所写即所得。虽然某些转换通过链接进行了优化,但执行计划是 DAG 的直接转换。例如:
rdd.mapPartitions(f).mapPartitions(g).mapPartitions(h).shuffle()
其中shuffle
是任意洗牌变换 ( *byKey
, repartition
, 等)。所有三个mapPartitions
( map
, flatMap
, filter
) 将被链接起来而不创建中间对象但不能重新排列。
与之相比,Datasets
使用限制性更强的编程模型,但可以使用多种技术优化执行,包括:
选择 ( filter
) 下推。例如,如果您有:
df.withColumn("foo", col("bar") + 1).where(col("bar").isNotNull())
可以执行为:
df.where(col("bar").isNotNull()).withColumn("foo", col("bar") + 1)
早期预测 ( select
) 和消除。例如:
df.withColumn("foo", col("bar") + 1).select("foo", "bar")
可以改写为:
df.select("foo", "bar").withColumn("foo", col("bar") + 1)
以避免获取和传递过时的数据。在极端情况下,它可以完全消除特定的变换:
df.withColumn("foo", col("bar") + 1).select("bar")
可以优化为
df.select("bar")
这些优化是可能的,原因有两个:
- 限制性数据模型,无需复杂且不可靠的静态代码分析即可进行依赖性分析。
- 清晰的运算符语义。运算符没有副作用,我们清楚地区分确定性和非确定性。
为了清楚起见,假设我们有以下数据模型:
case class Person(name: String, surname: String, age: Int)
val people: RDD[Person] = ???
我们要检索所有 21 岁以上的人的姓氏。用RDD
它可以表示为:
people
.map(p => (p.surname, p.age)) // f
.filter { case (_, age) => age > 21 } // g
现在让我们问自己几个问题:
age
输入inf
和age
变量 with之间有什么关系g
?
- 然后和
f
然后g
一样吗?g
f
f
和g
副作用免费吗?
虽然答案对于人类读者来说是显而易见的,但对于假设的优化器来说却不是。与Dataframe
版本相比:
people.toDF
.select(col("surname"), col("age")) // f'
.where(col("age") > 21) // g'
对于优化器和人类读者来说,答案都很清楚。
当使用静态类型Datasets
(Spark 2.0 Dataset vs DataFrame)时,这会产生一些进一步的后果。
DataSet 有更高级的类型化吗?
- 不 - 如果您关心优化。最高级的优化仅限于
Dataset[Row]
并且目前无法对复杂类型层次结构进行编码。
- 也许 - 如果您接受 Kryo 或 Java 编码器的开销。
“矢量化操作”是什么意思?
在优化的上下文中,我们通常指的是循环矢量化/循环展开。Spark SQL 使用代码生成来创建高级转换的编译器友好版本,可以进一步优化以利用矢量化指令集。
据我了解,DataSet 的低内存管理 = 高级序列化。
不完全是。使用本机分配的最大优点是逃避垃圾收集器循环。由于垃圾收集通常是 Spark 中的一个限制因素,因此这是一个巨大的改进,尤其是在需要大型数据结构的上下文中(例如准备 shuffle)。
另一个重要方面是列式存储,它可以实现有效的压缩(可能会降低内存占用)并优化压缩数据的操作。
一般来说,您可以使用纯手工制作的代码应用完全相同类型的优化RDDs
。毕竟Datasets
都有后盾RDDs
。不同的只是需要付出多少努力。
- 手工制作的执行计划优化相对容易实现。
- 使代码编译器友好需要一些更深入的知识,并且容易出错且冗长。
- 使用
sun.misc.Unsafe
本机内存分配不适合胆小的人。
尽管Dataset
API 有其所有优点,但它并不是通用的。虽然某些类型的常见任务可以在许多情况下从其优化中受益,但与 RDD 等效项相比,您可能没有任何改进甚至性能下降。