1

我正在使用火花 2.0.0 。这是我的代码:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

object WikiDataframe {

  def getDataframe(sparkSession: SparkSession): DataFrame = {

    val df = sparkSession.read.option("header", "true").option("inferSchema", "true").csv(FILE_LOCATION)

    df.registerTempTable("pageviews_by_second")

    df
  }

  def main(args: Array[String]) {
    val sparkSession = SparkSession
      .builder()
      .appName("Spark SQL Example")
      .master("local")
      .getOrCreate()

    val pageViewsDF = WikiDataframe.getDataframe(sparkSession)

    val query: DataFrame = sparkSession.sql("select Date from (select * from pageviews_by_second ) a")

    var logicalQuery: LogicalPlan = query.queryExecution.logical

    println("logicalQuery : " + logicalQuery);

    import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases

    println("Eliminating sub queries");

    logicalQuery = EliminateSubqueryAliases.apply(logicalQuery)

  }
}

我坚持执行logicalQuery. 我想得到dataframedataset如果可能的话。任何帮助,将不胜感激

4

1 回答 1

0

你需要写一个类org.apache.spark.sql并有如下内容

def apply(Sqlctx: SparkSession, Plan: LogicalPlan): DataFrame = {
  Dataset.ofRows(Sqlctx, Plan)
}
于 2017-07-26T10:28:04.230 回答