3

在 Spark 2.0.x 上,我一直在使用JobProgressListener实现从我们的集群中实时检索 Job/Stage/Task 进度信息。我了解事件流程的工作原理,并成功接收工作更新。

我的问题是我们在同一个 Spark Context 上同时运行了几个不同的提交,并且似乎无法区分每个提交属于哪个 Job/Stage/Task。每个 Job/Stage/Task 都会收到一个唯一的 id,这很棒。但是,我正在寻找一种方法来提供将与收到的 JobProgressListener 事件对象一起返回的提交“id”或“name”。

我意识到可以在 Spark Context 上设置“作业组”,但是如果多个作业同时在同一个上下文上运行,它们就会变得混乱。

有没有一种方法可以潜入将与单个 SQLContext 的侦听器事件一起返回的自定义属性?这样做,我应该能够连接后续的 Stage 和 Task 事件并得到我需要的东西。

请注意:我没有为这些工作使用 spark-submit。它们是使用对 SparkSession/SQLContext 的 Java 引用执行的。

感谢您提供任何解决方案或想法。

4

1 回答 1

0

I'm using a local property - this can be accessed from listener during the onStageSubmit event. After that I'm using the corresponding stageId in order to identify the task executed during that stage.

Future({
      sc.setLocalProperty("job-context", "second")
      val listener = new MetricListener("second")
      sc.addSparkListener(listener)
      //do some spark actions
      val df = spark.read.load("...")
      val countResult = df.filter(....).count()
      println(listener.rows)
      sc.removeSparkListener(listener)
    })

class MetricListener(name:String) extends SparkListener{

  var rows: Long = 0L
  var stageId = -1
  
  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
    if (stageSubmitted.properties.getProperty("job-context") == name){
      stageId = stageSubmitted.stageInfo.stageId
    }
  }

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    if (taskEnd.stageId == stageId)
      rows = rows + taskEnd.taskMetrics.inputMetrics.recordsRead
  }
  
}
于 2021-06-15T08:09:52.157 回答