1

我有两个节点独立集群用于火花流处理。下面是我的示例代码,它演示了我正在执行的过程。

sparkConf.setMaster("spark://rsplws224:7077") 
val ssc=new StreamingContext()
println(ssc.sparkContext.master)
val inDStream = ssc.receiverStream  //batch of 500 ms as i would like to have 1 sec latency 
val filteredDStream = inDStream.filter  // filtering unwanted tuples 
val keyDStream = filteredDStream.map    // converting to pair dstream 
val stateStream = keyDStream .updateStateByKey //updating state for history 

stateStream.checkpoint(Milliseconds(2500))  // to remove long lineage and meterilizing state stream 
stateStream.count()

val withHistory = keyDStream.join(stateStream) //joining state wit input stream for further processing 
val alertStream = withHistory.filter // decision to be taken by comparing history state and current tuple data
alertStream.foreach // notification to other system 

我的问题是 spark 没有将此状态 RDD 分配给多个节点或没有将任务分配给其他节点并导致响应的高延迟,我的输入负载约为每秒 100,000 个元组。

我已经尝试过以下事情,但没有任何效果

1)spark.locality.wait到 1 秒

2)减少分配给执行程序进程的内存以检查天气火花分发RDD或任务,但即使它超出了驱动器也在运行的第一个节点(m1)的内存限制。

3) 将 spark.streaming.concurrentJobs 从 1(默认)增加到 3

4) 我检查了流 ui 存储,状态 dstream RDD 大约有 20 个分区,都位于本地节点 m1 上。

如果我运行 SparkPi 100000,那么 spark 能够在几秒钟(30-40)后利用另一个节点,所以我确信我的集群配置很好。

编辑

我注意到的一件事是,即使对于我的 RDD,如果我设置存储级别 MEMORY_AND_DISK_SER_2 然后也在应用程序 ui 存储中显示Memory Serialized 1x Replicated

4

3 回答 3

2

Spark 不会自动在集群中分发流数据,因为它倾向于充分利用数据局部性(在其数据所在的位置启动任务会更好,这是默认配置)。但是您可以使用重新分区来分发流数据并提高并行度。您可以访问http://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#performance-tuning了解更多信息。

于 2015-07-23T00:54:29.283 回答
1

如果您没有访问集群并且您的作业仅在本地运行,则很可能意味着您的 Spark MasterSparkConf设置为本地 URI 而不是主 URI。

于 2014-06-29T14:01:47.307 回答
1

默认情况下,spark.default.parallelism 属性的值为“本地模式”,因此所有任务都将在接收数据的节点中执行。在 spark-defaults.conf 文件中更改此属性以提高并行度。

于 2015-02-04T09:23:18.780 回答