34

我喜欢 Spark 数据集,因为它们在编译时给我分析错误和语法错误,还允许我使用 getter 而不是硬编码的名称/数字。大多数计算都可以通过 Dataset 的高级 API 完成。例如,通过访问 Dataset 类型的对象来执行agg、select、sum、avg、map、filter 或 groupBy操作比使用 RDD 行的数据字段要简单得多。

但是这里缺少连接操作,我读到我可以像这样进行连接

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")

但这不是我想要的,因为我更愿意通过案例类接口来做,所以更像这样

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

目前最好的选择似乎是在案例类旁边创建一个对象,并给这个函数提供正确的列名作为字符串。所以我会使用第一行代码,但放置一个函数而不是硬编码的列名。但这感觉不够优雅..

有人可以在这里给我其他选择的建议吗?目标是从实际的列名中抽象出来,并最好通过案例类的 getter 工作。

我正在使用 Spark 1.6.1 和 Scala 2.10

4

2 回答 2

36

观察

只有在连接条件基于相等运算符时,Spark SQL 才能优化连接。这意味着我们可以分别考虑等值连接和非等值连接。

等值连接

Equijoin 可以通过映射Datasets到 (key, value) 元组、基于键执行连接以及重塑结果以类型安全的方式实现:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset

def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
    (f: T => K, g: U => K)
    (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
  val ds1_ = ds1.map(x => (f(x), x))
  val ds2_ = ds2.map(x => (g(x), x))
  ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}

非等值连接

可以使用关系代数运算符表示为 R ⋈θ S = σθ(R × S) 并直接转换为代码。

火花2.0

启用crossJoin和使用joinWith平凡相等的谓词:

spark.conf.set("spark.sql.crossJoin.enabled", true)

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
                         (p: (T, U) => Boolean) = {
  ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}

火花 2.1

使用crossJoin方法:

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
    (p: (T, U) => Boolean)
    (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
  ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}

例子

case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)

val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
  LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS

safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)

笔记

  • 应该注意的是,这些方法在质量上与直接joinWith应用程序不同,并且需要昂贵的DeserializeToObject/SerializeFromObject转换(相比之下,直接joinWith可以对数据使用逻辑操作)。

    这类似于Spark 2.0 Dataset vs DataFrame中描述的行为。

  • 如果您不限于 Spark SQL APIframeless提供了有趣的类型安全扩展Datasets(截至目前,它仅支持 Spark 2.0):

    import frameless.TypedDataset
    
    val typedPoints1 = TypedDataset.create(points1)
    val typedPoints2 = TypedDataset.create(points2)
    
    typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
    
  • DatasetAPI 在 1.6 中不稳定,所以我认为在那里使用它没有意义。

  • 当然,这种设计和描述性名称不是必需的。您可以轻松地使用类型类将此方法隐式添加到Dataset与内置签名没有冲突的情况下,因此两者都可以调用joinWith

于 2016-11-18T09:53:01.180 回答
3

此外,非类型安全 Spark API 的另一个更大问题是,当您加入两个 时Datasets,它会给您一个DataFrame. 然后您会丢失原始两个数据集中的类型。

val a: Dataset[A]
val b: Dataset[B]

val joined: Dataframe = a.join(b)
// what would be great is 
val joined: Dataset[C] = a.join(b)(implicit func: (A, B) => C)
于 2018-11-23T20:26:49.557 回答