2

在 Spark-2.0 中,创建 Spark 会话的最佳方式是什么。因为在 Spark-2.0 和 Cassandra 中,API 都进行了重新设计,基本上弃用了 SqlContext(以及 CassandraSqlContext)。因此,为了执行 SQL,我创建了一个 Cassandra Session (com.datastax.driver.core.Session) and use execute( " ")。或者我必须创建一个SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText)方法。

我不知道两者的 SQL 限制 - 有人可以解释一下。

此外,如果我必须创建 SparkSession - 我该怎么做 - 找不到任何合适的示例。随着 API 的重新设计,旧的示例不起作用。我正在通过这个代码示例——DataFrames——不清楚这里使用的是什么 sql 上下文(这是正确的方法。)(由于某种原因,不推荐使用的 API 甚至没有编译——需要检查我的 eclipse 设置)

谢谢

4

1 回答 1

9

您需要 Cassandra Session 来从 Cassandra DB 创建/删除键空间和表。在 Spark 应用程序中,为了创建 Cassandra 会话,您需要将 SparkConf 传递给 CassandraConnector。在 Spark 2.0 中,你可以像下面那样做。

 SparkSession spark = SparkSession
              .builder()
              .appName("SparkCassandraApp")
              .config("spark.cassandra.connection.host", "localhost")
              .config("spark.cassandra.connection.port", "9042")
              .master("local[2]")
              .getOrCreate();

CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");

如果您有现有的 Dataframe,那么您也可以使用在 Cassandra 中创建表DataFrameFunctions.createCassandraTable(Df)在此处查看 api 详细信息。

您可以使用 spark-cassandra-connector 提供的 api 从 Cassandra DB 读取数据,如下所示。

Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "mykeyspace");
                    put("table", "mytable");
                }
            }).load();

dataset.show(); 

您可以使用 SparkSession.sql() 方法在由 spark cassandra 连接器返回的 Dataframe 上创建的临时表上运行查询,如下所示。

dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();
于 2016-12-08T05:02:11.747 回答