3

我有一个 Web 应用程序,它将使用 spark 启动器库在 Cloudera 火花集群上提交火花作业。

它正在成功地将 spark 作业提交到集群。但是,它不会回调侦听器类方法,并且即使在集群上的作业完成执行后getState(),返回的SparkAppHandle也不会从“未知”更改。

我正在使用yarn-cluster模式。这是我的代码。还有什么需要做的吗?

SparkLauncher launcher = new SparkLauncher()
                           .setSparkHome("sparkhome")
                           .setMaster("yarn-cluster")
                           .setAppResource("spark job jar file")
                           .setMainClass("spark job driver class")
                           .setAppName("appname")
                           .addAppArgs(argsArray)
                           .setVerbose(true)
                           .addSparkArg("--verbose");

SparkAppHandle handle = launcher.startApplication(new LauncherListener());

int c = 0;
while(!handle.getState().isFinal()) {
  LOG.info(">>>>>>>> state is=  "+handle.getState() );
  LOG.info(">>>>>>>> state is not final yet. counter=  "+c++ );
  LOG.info(">>>>>>>> sleeping for a second");
  try {
    Thread.sleep(1000L);
  } catch (InterruptedException e) {
  }
  if(c == 200)
    break;
}

以下是我已经尝试过的事情:

  1. SparkAppHandle在应用程序启动后添加侦听器实例。
  2. 使当前类实现SparkAppHandle.Listener并以两种方式传递它(this)(在启动时,并通过设置它SparkAppHandle
  3. 尝试使用launcher.launch()方法,以便至少我可以Process通过调用process.waitFor()方法阻止生成的对象,直到火花作业在集群上完成运行。但是在这种情况下,对于长时间运行的 spark 作业,此节点上的相应进程永远不会返回(尽管它对于在 1 或 2 分钟内完成的 spark 作业工作正常)
4

1 回答 1

0

我从 spark 用户邮件列表中得到了这个问题的答案。要使此功能正常工作,不仅 spark 启动器需要为 1.6.0,而且底层 spark 至少应为 1.6.0。

我一直在使用 spark 1.5.1 和 1.6.0 版本的启动器库。现在我已将 spark 集群更新到 1.6.0 版本,现在我正在获取侦听器方法的回调。

于 2016-11-08T05:10:10.270 回答