我有一个 Web 应用程序,它将使用 spark 启动器库在 Cloudera 火花集群上提交火花作业。
它正在成功地将 spark 作业提交到集群。但是,它不会回调侦听器类方法,并且即使在集群上的作业完成执行后getState()
,返回的SparkAppHandle也不会从“未知”更改。
我正在使用yarn-cluster
模式。这是我的代码。还有什么需要做的吗?
SparkLauncher launcher = new SparkLauncher()
.setSparkHome("sparkhome")
.setMaster("yarn-cluster")
.setAppResource("spark job jar file")
.setMainClass("spark job driver class")
.setAppName("appname")
.addAppArgs(argsArray)
.setVerbose(true)
.addSparkArg("--verbose");
SparkAppHandle handle = launcher.startApplication(new LauncherListener());
int c = 0;
while(!handle.getState().isFinal()) {
LOG.info(">>>>>>>> state is= "+handle.getState() );
LOG.info(">>>>>>>> state is not final yet. counter= "+c++ );
LOG.info(">>>>>>>> sleeping for a second");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
if(c == 200)
break;
}
以下是我已经尝试过的事情:
SparkAppHandle
在应用程序启动后添加侦听器实例。- 使当前类实现
SparkAppHandle.Listener
并以两种方式传递它(this
)(在启动时,并通过设置它SparkAppHandle
) - 尝试使用
launcher.launch()
方法,以便至少我可以Process
通过调用process.waitFor()
方法阻止生成的对象,直到火花作业在集群上完成运行。但是在这种情况下,对于长时间运行的 spark 作业,此节点上的相应进程永远不会返回(尽管它对于在 1 或 2 分钟内完成的 spark 作业工作正常)