0

SparkAppHandle.Listener 在使用 k8s 上的 InProcessLauncher 启动 spark 应用程序时没有收到状态更改事件(使用 SparkLauncher spark-submit 侦听器观察到相同的问题,但我使用 InProcessLauncher 来避免子进程)。我的目的只是启动 spark 应用程序并确保它已成功提交并进入运行状态。

监听器代码

public class LaunchListener implements SparkAppHandle.Listener {
    private boolean isLaunched = false;
    private long startTime = System.currentTimeMillis();
    private final long timeout;

    public LaunchListener(long timeout) {
        this.timeout = timeout;
    }

    @Override
    public synchronized void stateChanged(SparkAppHandle handle) {
        synchronized (this) {
            if (!isLaunched) {
                SparkAppHandle.State state = handle.getState();
                isLaunched = state == SparkAppHandle.State.RUNNING || state == SparkAppHandle.State.SUBMITTED;   
            }
        }
    }

    @Override
    public void infoChanged(SparkAppHandle handle) {
         // Do Nothing
    }

    public boolean isLaunched() {
        return isLaunched;
    }

    public boolean isDone() {
        return isLaunched || System.currentTimeMillis() > startTime + timeout  ;
    }
}

启动器代码

InProcessLauncher sparkLauncher ;// initialized with correct configuration
LaunchListener listener = new LaunchListener(60_000l);
SparkAppHandle sparkAppHandle = sparkLauncher.startApplication(listener);
while (!listener.isDone()) {
    Thread.sleep(500l);
    state = sparkAppHandle.getState();
}
sparkAppHandle.disconnect();
// Collect the state

当我最后收集状态时 - 由于超时约 60 秒和 listener.isLaunched() -> false,它总是到达那里。如果我们增加超时时间,那么在 spark 应用程序完成时,我们会收到 LOST 状态作为状态更改,但未收到已提交/正在运行或任何其他状态。
请注意,火花应用程序已成功启动。

4

0 回答 0