您需要 Cassandra Session 来从 Cassandra DB 创建/删除键空间和表。在 Spark 应用程序中,为了创建 Cassandra 会话,您需要将 SparkConf 传递给 CassandraConnector。在 Spark 2.0 中,你可以像下面那样做。
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");
如果您有现有的 Dataframe,那么您也可以使用在 Cassandra 中创建表DataFrameFunctions.createCassandraTable(Df)
。在此处查看 api 详细信息。
您可以使用 spark-cassandra-connector 提供的 api 从 Cassandra DB 读取数据,如下所示。
Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).load();
dataset.show();
您可以使用 SparkSession.sql() 方法在由 spark cassandra 连接器返回的 Dataframe 上创建的临时表上运行查询,如下所示。
dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();