diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index b0f8ca2ab0d3f..1da7a988203db 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -33,6 +33,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStages = listener.activeStages.values.toSeq + val pendingStages = listener.pendingStages.values.toSeq val completedStages = listener.completedStages.reverse.toSeq val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq @@ -43,6 +44,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = parent.killEnabled) + val pendingStagesTable = + new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, + parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + killEnabled = false) val completedStagesTable = new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false) @@ -72,6 +77,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { Active Stages: {activeStages.size} +
  • + Pending Stages: + {pendingStages.size} +
  • Completed Stages: {numCompletedStages} @@ -91,6 +100,8 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { }} ++

    Active Stages ({activeStages.size})

    ++ activeStagesTable.toNodeSeq ++ +

    Pending Stages ({pendingStages.size})

    ++ + pendingStagesTable.toNodeSeq ++

    Completed Stages ({numCompletedStages})

    ++ completedStagesTable.toNodeSeq ++

    Failed Stages ({numFailedStages})

    ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b0d3bed1300b3..4d200eeda86b9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -56,6 +56,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val jobIdToData = new HashMap[JobId, JobUIData] // Stages: + val pendingStages = new HashMap[StageId, StageInfo] val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val skippedStages = ListBuffer[StageInfo]() @@ -157,6 +158,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageIds = jobStart.stageIds, jobGroup = jobGroup, status = JobExecutionStatus.RUNNING) + jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x) // Compute (a potential underestimate of) the number of tasks that will be run by this job. // This may be an underestimate because the job start event references all of the result // stages' transitive stage dependencies, but some of these stages might be skipped if their @@ -187,6 +189,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } jobData.completionTime = Option(jobEnd.time).filter(_ >= 0) + jobData.stageIds.foreach(pendingStages.remove) jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData @@ -257,7 +260,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized { val stage = stageSubmitted.stageInfo activeStages(stage.stageId) = stage - + pendingStages.remove(stage.stageId) val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME)