观察
只有在连接条件基于相等运算符时,Spark SQL 才能优化连接。这意味着我们可以分别考虑等值连接和非等值连接。
等值连接
Equijoin 可以通过映射Datasets
到 (key, value) 元组、基于键执行连接以及重塑结果以类型安全的方式实现:
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset
def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
(f: T => K, g: U => K)
(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
val ds1_ = ds1.map(x => (f(x), x))
val ds2_ = ds2.map(x => (g(x), x))
ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}
非等值连接
可以使用关系代数运算符表示为 R ⋈θ S = σθ(R × S) 并直接转换为代码。
火花2.0
启用crossJoin
和使用joinWith
平凡相等的谓词:
spark.conf.set("spark.sql.crossJoin.enabled", true)
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean) = {
ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}
火花 2.1
使用crossJoin
方法:
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean)
(implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}
例子
case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)
val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS
safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)
笔记
应该注意的是,这些方法在质量上与直接joinWith
应用程序不同,并且需要昂贵的DeserializeToObject
/SerializeFromObject
转换(相比之下,直接joinWith
可以对数据使用逻辑操作)。
这类似于Spark 2.0 Dataset vs DataFrame中描述的行为。
如果您不限于 Spark SQL APIframeless
提供了有趣的类型安全扩展Datasets
(截至目前,它仅支持 Spark 2.0):
import frameless.TypedDataset
val typedPoints1 = TypedDataset.create(points1)
val typedPoints2 = TypedDataset.create(points2)
typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
Dataset
API 在 1.6 中不稳定,所以我认为在那里使用它没有意义。
当然,这种设计和描述性名称不是必需的。您可以轻松地使用类型类将此方法隐式添加到Dataset
与内置签名没有冲突的情况下,因此两者都可以调用joinWith
。