2

因为,使用withDataFrame很容易通过一些操作生成一个新列。要做这样的事情,我想我会使用这个函数:udfdf.withColumn("newCol", myUDF("someCol"))Datasetmap

def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]

您必须将整个案例类T作为输入传递给函数。如果Dataset[T]有很多字段/列,如果您只想通过对T. 我的问题是,Catalyst 是否足够聪明,能够对此进行优化?

4

2 回答 2

3

Catalyst 是否足够聪明,能够对此进行优化?

tl;dr No. 请参阅SPARK-14083 分析 JVM 字节码并将闭包转换为 Catalyst 表达式

目前,Spark SQL 的 Catalyst Optimizer 无法知道您在 Scala 代码中做了什么。

引用SPARK-14083

Dataset API 的一大优势是类型安全,但由于严重依赖用户定义的闭包/lambdas 而以性能为代价。这些闭包通常比表达式慢,因为我们可以更灵活地优化表达式(已知数据类型,无虚函数调用等)。在许多情况下,查看这些闭包的字节码并弄清楚它们要做什么实际上并不难。如果我们能理解它们,那么我们就可以将它们直接转化为 Catalyst 表达式,以实现更优化的执行。

甚至提到了你的案例:

df.map(_.name) // 等价于表达式col("name")

正如你所看到的,它仍然是开放的,我怀疑目前是否有人在这方面工作。


您可以对 Spark Optimizer 做些什么来帮助select这一列,然后才使用map带有单参数 UDF 的运算符。

这肯定符合您不将整个 JVM 对象传递给您的函数的要求,但不会摆脱从内部行表示到您的 Scala 对象的缓慢反序列化(这将落在 JVM 上并占用一些空间,直到 GC 发生)。

于 2017-05-17T14:48:39.847 回答
0

我试图弄清楚自己,因为我在任何地方都找不到回应。

让我们有一个数据集,其中包含具有多个字段的案例类:

scala> case class A(x: Int, y: Int)
scala> val dfA = spark.createDataset[A](Seq(A(1, 2)))
scala> val dfX = dfA.map(_.x)

现在,如果我们检查优化计划,我们会得到以下信息:

scala> val plan = dfX.queryExecution.optimizedPlan

SerializeFromObject [input[0, int, true] AS value#8]
    +- MapElements <function1>, obj#7: int
        +- DeserializeToObject newInstance(class A), obj#6: A
           +- LocalRelation [x#2, y#3]    

根据更详细plan.toJSONDeserializeToObject步骤,假设xy都存在。

正如您所证明的那样,例如以下代码片段,它使用反射而不是直接接触A仍然有效的字段。

val dfX = dfA.map(
  _.getClass.getMethods.find(_.getName == "x").get.invoke(x).asInstanceOf[Int]
)
于 2017-05-17T14:27:58.917 回答