我们正在尝试使用Spark Streaming和Spark SQL实现一个用例,它允许我们针对某些数据运行用户定义的规则(有关如何捕获和使用数据,请参见下文)。这个想法是使用 SQL 来指定规则并将结果作为警报返回给用户。基于每个传入事件批次执行查询似乎非常慢。如果有人能提出更好的方法来实现这个用例,我将不胜感激。另外,想知道 Spark 是在驱动程序还是工作程序上执行 sql?提前致谢。以下是我们为实现这一目标而执行的步骤 -
1) 从外部数据库加载初始数据集作为 JDBCRDD
JDBCRDD<SomeState> initialRDD = JDBCRDD.create(...);
2)创建一个传入的 DStream(捕获对初始化数据的更新)
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(ssc, flumeAgentHost, flumeAgentPort);
JavaDStream<SomeState> incomingDStream = flumeStream.map(...);
3) 使用传入的 DStream 创建一个 Pair DStream
JavaPairDStream<Object,SomeState> pairDStream =
incomingDStream.map(...);
4)使用初始化的RDD作为基础状态从pair DStream创建一个Stateful DStream
JavaPairDStream<Object,SomeState> statefulDStream = pairDStream.updateStateByKey(...);
JavaRDD<SomeState> updatedStateRDD = statefulDStream.map(...);
5) 根据传入流中的值对更新状态运行用户驱动的查询
incomingStream.foreachRDD(new Function<JavaRDD<SomeState>,Void>() {
@Override
public Void call(JavaRDD<SomeState> events) throws Exception {
updatedStateRDD.count();
SQLContext sqx = new SQLContext(events.context());
schemaDf = sqx.createDataFrame(updatedStateRDD, SomeState.class);
schemaDf.registerTempTable("TEMP_TABLE");
sqx.sql(SELECT col1 from TEMP_TABLE where <condition1> and <condition2> ...);
//collect the results and process and send alerts
...
}
);