SparkAppHandle.Listener 在使用 k8s 上的 InProcessLauncher 启动 spark 应用程序时没有收到状态更改事件(使用 SparkLauncher spark-submit 侦听器观察到相同的问题,但我使用 InProcessLauncher 来避免子进程)。我的目的只是启动 spark 应用程序并确保它已成功提交并进入运行状态。
监听器代码
public class LaunchListener implements SparkAppHandle.Listener {
private boolean isLaunched = false;
private long startTime = System.currentTimeMillis();
private final long timeout;
public LaunchListener(long timeout) {
this.timeout = timeout;
}
@Override
public synchronized void stateChanged(SparkAppHandle handle) {
synchronized (this) {
if (!isLaunched) {
SparkAppHandle.State state = handle.getState();
isLaunched = state == SparkAppHandle.State.RUNNING || state == SparkAppHandle.State.SUBMITTED;
}
}
}
@Override
public void infoChanged(SparkAppHandle handle) {
// Do Nothing
}
public boolean isLaunched() {
return isLaunched;
}
public boolean isDone() {
return isLaunched || System.currentTimeMillis() > startTime + timeout ;
}
}
启动器代码
InProcessLauncher sparkLauncher ;// initialized with correct configuration
LaunchListener listener = new LaunchListener(60_000l);
SparkAppHandle sparkAppHandle = sparkLauncher.startApplication(listener);
while (!listener.isDone()) {
Thread.sleep(500l);
state = sparkAppHandle.getState();
}
sparkAppHandle.disconnect();
// Collect the state
当我最后收集状态时 - 由于超时约 60 秒和 listener.isLaunched() -> false,它总是到达那里。如果我们增加超时时间,那么在 spark 应用程序完成时,我们会收到 LOST 状态作为状态更改,但未收到已提交/正在运行或任何其他状态。
请注意,火花应用程序已成功启动。