我正在尝试使用自定义接收器编写 Spark 流应用程序。我应该通过提供具有预定义间隔的随机值来模拟实时输入数据。(简化的)接收器如下所示,相应的 Spark Streaming 应用程序代码如下:
class SparkStreamingReceiver extends Actor with ActorHelper {
private val random = new Random()
override def preStart = {
context.system.scheduler.schedule(500 milliseconds, 1000 milliseconds)({
self ! ("string", random.nextGaussian())
})
}
override def receive = {
case data: (String, Double) => {
store[(String, Double)](data)
}
}
}
val conf: SparkConf = new SparkConf()
conf.setAppName("Spark Streaming App")
.setMaster("local")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
val randomValues: ReceiverInputDStream[(String, Double)] =
ssc.actorStream[(String,Double)](Props(new SparkStreamingReceiver()), "Receiver")
randomValues.saveAsTextFiles("<<OUTPUT_PATH>>/randomValues")
运行此代码,我看到接收器正在工作(存储项目,接收到单个日志条目)。但是,saveAsTextFiles
永远不会输出值。
local[2]
我可以通过将主服务器更改为使用两个线程(更具体地说,我需要至少比注册的自定义接收器数量多一个线程才能获得任何输出。
在我看来,工作线程似乎被接收者停止了。
谁能解释这种效果,以及如何修复我的代码?