32

从 spark 2.0.1 开始我有一些问题。我阅读了很多文档,但到目前为止找不到足够的答案:

  • 和有什么区别
    • df.select("foo")
    • df.select($"foo")
  • 我理解正确吗
    • myDataSet.map(foo.someVal)是类型安全的,不会转换为RDD但保留在 DataSet 表示中/没有额外的开销(2.0.0 的性能明智)
  • 所有其他命令,例如 select、.. 只是语法糖。它们不是类型安全的,可以使用映射。df.select("foo")如果没有 map 语句 ,我怎么能保证类型安全?
    • 为什么我应该使用 UDF / UADF 而不是地图(假设地图保留在数据集表示中)?
4

3 回答 3

37
  1. df.select("foo") 和之间的区别df.select($"foo")是签名。前者至少取一String,后者取零或更多Columns。除此之外没有实际区别。
  2. myDataSet.map(foo.someVal)类型检查,但由于任何Dataset操作都使用RDD对象,并且与DataFrame操作相比,开销很大。我们来看一个简单的例子:

    case class FooBar(foo: Int, bar: String)
    val ds = Seq(FooBar(1, "x")).toDS
    ds.map(_.foo).explain
    
    == Physical Plan ==
    *SerializeFromObject [input[0, int, true] AS value#123]
    +- *MapElements <function1>, obj#122: int
       +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
          +- LocalTableScan [foo#117, bar#118]
    

    如您所见,此执行计划需要访问所有字段并且必须DeserializeToObject.

  3. 不。一般来说,其他方法不是语法糖,并且会生成明显不同的执行计划。例如:

    ds.select($"foo").explain
    
    == Physical Plan ==
    LocalTableScan [foo#117]
    

    与之前显示的计划相比,它可以直接访问列。与其说是 API 的限制,不如说是操作语义不同的结果。

  4. 如果没有 map 语句,我怎么能 df.select("foo") 类型安全?

    没有这样的选择。虽然类型化列允许您静态Dataset转换为另一个静态类型Dataset

    ds.select($"bar".as[Int])
    

    没有类型安全。还有一些其他尝试包括类型安全优化操作,例如类型化聚合,但是这个实验性 API。

  5. 为什么我应该使用 UDF / UADF 而不是地图

    这完全取决于你。Spark 中的每个分布式数据结构都有自己的优点和缺点(例如,请参见Spark UDAF with ArrayType as bufferSchema 性能问题)。

就个人而言,我发现静态类型Dataset最没用:

  • 不提供Dataset[Row]DataFrame.

  • 类型化转换是黑盒,有效地为优化器创建分析障碍。例如,选择(过滤器)不能被推送到类型转换:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
    
    == Physical Plan ==
    *Filter (foo#133 = 1)
    +- *Filter <function1>.apply
       +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
          +- Exchange hashpartitioning(foo#133, 200)
             +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
                +- LocalTableScan [foo#133, bar#134]
    

    相比:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
    
    == Physical Plan ==
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
    +- Exchange hashpartitioning(foo#133, 200)
       +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
          +- *Filter (foo#133 = 1)
             +- LocalTableScan [foo#133, bar#134] 
    

    这会影响谓词下推或投影下推等功能。

  • 没有像RDDs原生支持的一小部分类型那样灵活。

  • “类型安全”在使用方法转换Encoders时是有争议的。由于未使用签名对数据形状进行编码,因此编译器只能验证.DatasetasEncoder

相关问题:

于 2016-11-15T05:48:17.863 回答
1

SparkDataset比 Spark 更强大Dataframe。小例子 - 您只能创建Dataframeof或任何原始数据类型,但Row也可以创建任何非原始类型。即您可以从字面上创建对象类型。TupleDatasetDatasetDataset

前任:

case class Employee(id:Int,name:String)

Dataset[Employee]   // is valid
Dataframe[Employee] // is invalid
于 2018-05-16T19:47:47.903 回答
0

DATAFRAME:DataFrame 是一种抽象,它允许数据的模式视图。

案例类人(姓名:字符串,年龄:整数,地址:字符串)

定义类人

scala > val df = List ( Person ( “Sumanth”, 23, “BNG”)

数据帧与数据集

DATASET:Data Set 是 Dataframe API 的扩展,是最新的抽象,它试图提供 RDD 和 Dataframe 的最佳性能。

于 2019-11-23T13:39:21.737 回答