1

我们正在尝试使用Spark StreamingSpark SQL实现一个用例,它允许我们针对某些数据运行用户定义的规则(有关如何捕获和使用数据,请参见下文)。这个想法是使用 SQL 来指定规则并将结果作为警报返回给用户。基于每个传入事件批次执行查询似乎非常慢。如果有人能提出更好的方法来实现这个用例,我将不胜感激。另外,想知道 Spark 是在驱动程序还是工作程序上执行 sql?提前致谢。以下是我们为实现这一目标而执行的步骤 -

1) 从外部数据库加载初始数据集作为 JDBCRDD

JDBCRDD<SomeState> initialRDD = JDBCRDD.create(...);

2)创建一个传入的 DStream(捕获对初始化数据的更新)

JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
            FlumeUtils.createStream(ssc, flumeAgentHost, flumeAgentPort);

JavaDStream<SomeState> incomingDStream = flumeStream.map(...);

3) 使用传入的 DStream 创建一个 Pair DStream

JavaPairDStream<Object,SomeState> pairDStream =
            incomingDStream.map(...);

4)使用初始化的RDD作为基础状态从pair DStream创建一个Stateful DStream

JavaPairDStream<Object,SomeState> statefulDStream = pairDStream.updateStateByKey(...);

JavaRDD<SomeState> updatedStateRDD = statefulDStream.map(...);

5) 根据传入流中的值对更新状态运行用户驱动的查询

incomingStream.foreachRDD(new Function<JavaRDD<SomeState>,Void>() {

            @Override
            public Void call(JavaRDD<SomeState> events) throws Exception { 

                updatedStateRDD.count();
                SQLContext sqx = new SQLContext(events.context());
                schemaDf = sqx.createDataFrame(updatedStateRDD, SomeState.class);
                schemaDf.registerTempTable("TEMP_TABLE");
                sqx.sql(SELECT col1 from TEMP_TABLE where <condition1> and <condition2> ...);

                //collect the results and process and send alerts
                ...

            }
);
4

2 回答 2

0

我也面临同样的问题,请问您是否有相同的解决方案?虽然我在下面的帖子中提到了详细的用例。

Spark SQL + Window + Streming 问题 - 使用 Spark 流运行时,Spark SQL 查询需要很长时间才能执行

于 2015-09-08T09:27:41.563 回答
0

第一步应该是确定哪个步骤花费的时间最多。请查看 Spark Master UI 并确定哪个步骤/阶段花费的时间最多。

您可以考虑一些最佳实践+我的观察:-

  1. 使用单例 SQLContext - 请参阅示例 - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
  2. updateStateByKey 在有大量键的情况下可能是内存密集型操作。您需要检查 updateStateByKey 函数处理的数据大小,以及它是否适合给定的内存。
  3. 你的 GC 表现如何?
  4. 你真的在使用“initialRDD”吗?如果没有,则不要加载它。如果它是静态数据集,则将其缓存。
  5. 检查您的 SQL 查询所花费的时间。

这里还有一些可以帮助您的问题/领域

  1. DStreams 的 StorageLevel 是什么?
  2. 集群大小和集群配置
  3. 火花版本?

最后 - ForEachRDD 是一个输出操作,它在驱动程序上执行给定的功能,但 RDD 可能会执行操作,并且这些操作会在工作节点上执行。

您可能需要阅读此内容以更好地解释输出操作 - http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

于 2015-08-13T03:44:45.870 回答