我想将 SQL 字符串作为用户输入,然后在执行前对其进行转换。特别是,我想修改顶层投影(select 子句),注入额外的列以供查询检索。
我希望通过使用sparkSession.experimental.extraOptimizations
. 我知道我正在尝试的并不是严格意义上的优化(转换改变了 SQL 语句的语义),但 API 似乎仍然合适。但是,查询执行器似乎忽略了我的转换。
这是一个最小的例子来说明我遇到的问题。首先定义一个行案例类:
case class TestRow(a: Int, b: Int, c: Int)
然后定义一个简单地丢弃任何投影的优化规则:
object RemoveProjectOptimisationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case x: Project => x.child
}
}
现在创建一个数据集,注册优化,并运行 SQL 查询:
// Create a dataset and register table.
val dataset = List(TestRow(1, 2, 3)).toDS()
val tableName: String = "testtable"
dataset.createOrReplaceTempView(tableName)
// Register "optimisation".
sparkSession.experimental.extraOptimizations =
Seq(RemoveProjectOptimisationRule)
// Run query.
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a = 1")
// Print query result and the queryExecution object.
println("Query result:")
projected.collect.foreach(println)
println(projected.queryExecution)
这是输出:
Query result:
[1]
== Parsed Logical Plan ==
'Project ['a]
+- 'Filter ('a = 1)
+- 'UnresolvedRelation `testtable`
== Analyzed Logical Plan ==
a: int
Project [a#3]
+- Filter (a#3 = 1)
+- SubqueryAlias testtable
+- LocalRelation [a#3, b#4, c#5]
== Optimized Logical Plan ==
Filter (a#3 = 1)
+- LocalRelation [a#3, b#4, c#5]
== Physical Plan ==
*Filter (a#3 = 1)
+- LocalTableScan [a#3, b#4, c#5]
我们看到结果与原始 SQL 语句的结果相同,但未应用转换。然而,在打印逻辑和物理计划时,投影确实已被删除。我还确认(通过调试日志输出)确实正在调用转换。
关于这里发生了什么的任何建议?也许优化器只是忽略了改变语义的“优化”?
如果使用优化不是要走的路,有人可以提出替代方案吗?我真正想做的就是解析输入的 SQL 语句,对其进行转换,然后将转换后的 AST 传递给 Spark 以执行。但据我所知,执行此操作的 API 对 Sparksql
包来说是私有的。可能可以使用反射,但我想避免这种情况。
任何指针将不胜感激。