3

我正在尝试使用自定义接收器编写 Spark 流应用程序。我应该通过提供具有预定义间隔的随机值来模拟实时输入数据。(简化的)接收器如下所示,相应的 Spark Streaming 应用程序代码如下:

class SparkStreamingReceiver extends Actor with ActorHelper {

  private val random = new Random()

  override def preStart = {
    context.system.scheduler.schedule(500 milliseconds, 1000 milliseconds)({
        self ! ("string", random.nextGaussian())
    })
  }

  override def receive = {
    case data: (String, Double) => {
      store[(String, Double)](data)
    }
  }
}
val conf: SparkConf = new SparkConf()
conf.setAppName("Spark Streaming App")
    .setMaster("local")

val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))

val randomValues: ReceiverInputDStream[(String, Double)] =
    ssc.actorStream[(String,Double)](Props(new SparkStreamingReceiver()), "Receiver")

randomValues.saveAsTextFiles("<<OUTPUT_PATH>>/randomValues")

运行此代码,我看到接收器正在工作(存储项目接收到单个日志条目)。但是,saveAsTextFiles永远不会输出值。

local[2]我可以通过将主服务器更改为使用两个线程(更具体地说,我需要至少比注册的自定义接收器数量多一个线程才能获得任何输出。

在我看来,工作线程似乎被接收者停止了。

谁能解释这种效果,以及如何修复我的代码?

4

1 回答 1

7

每个接收器使用一个计算槽。所以 2 个接收器将需要 2 个计算槽。如果所有的计算槽都被接收器占用,那么就没有槽来处理数据了。这就是为什么带有 1 个接收器的“本地”模式和带有 2 个接收器的“本地 [2]”会停止处理。

于 2014-09-11T21:21:50.107 回答