24

我正在尝试使用 Spark Dataset API,但在进行简单连接时遇到了一些问题。

假设我有两个带有 fields: 的数据集date | value,那么在DataFrame我的 join 情况下将如下所示:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

但是对于Dataset.joinWith方法,但相同的方法不起作用:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )

所需的论据是.joinWith什么?

4

3 回答 3

39

要使用joinWith您首先必须创建一个DataSet,并且很可能是其中两个。要创建DataSet,您需要创建一个与您的架构匹配的案例类并调用DataFrame.as[T]where Tis your case class。所以:

case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

您也可以跳过案例类并使用元组:

val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

然后,如果您有另一个案例类/ DF,就像这样说:

case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

然后,虽然 和 的语法join相似joinWith,但结果却不同:

df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// |  1| asdf|  1| 7.7| 101|
// |  2|34234|  2| 1.2|  10|
// +---+-----+---+----+----+

ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// |       _1|         _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+

如您所见,joinWith将对象完整地保留为元组的一部分,同时join将列扁平化为单个命名空间。(在上述情况下会出现问题,因为列名“key”重复了。)

奇怪的是,我必须使用df.col("key")anddf2.col("key")来创建加入的条件,ds而且ds2——如果你只col("key")在任一侧使用它就不起作用,ds.col(...)也不存在。df.col("key")然而,使用原版就可以了。

于 2016-04-07T04:13:56.377 回答
9

来自https://docs.cloud.databricks.com/docs/latest/databricks_guide/05%20Spark/1%20Intro%20Datasets.html

看起来你可以做

dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" )
于 2016-04-11T23:38:08.063 回答
2

对于上面的示例,您可以尝试以下操作:

为您的输出定义一个案例类

case class JoinOutput(key:Int, value:String, num1:Double, num2:Long) 

用 连接两个数据集Seq("key"),这将帮助您避免在输出中出现两个重复的键列,这也将有助于在下一步中应用案例类或获取数据

val joined = ds.join(ds2, Seq("key")).as[JoinOutput]
// res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

结果将是平坦的:

joined.show

+---+-----+----+----+
|key|value|num1|num2|
+---+-----+----+----+
|  1| asdf| 7.7| 101|
|  2|34234| 1.2|  10|
+---+-----+----+----+
于 2017-10-06T03:51:52.497 回答