diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js index d56ba3d09e589..39ca033fb17b7 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js +++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js @@ -24,8 +24,8 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) { return a.value - b.value }, editable: false, - showCurrentTime: false, - showCustomTime: true, + showCurrentTime: true, + min: startTime, zoomable: false }; @@ -34,10 +34,6 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) { applicationTimeline.setGroups(groups); applicationTimeline.setItems(items); - if (startTime != -1) { - applicationTimeline.setCustomTime(startTime); - } - setupZoomable("#application-timeline-zoom-lock", applicationTimeline); $(".item.range.job.application-timeline-object").each(function() { @@ -49,7 +45,7 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) { }); } -function drawJobTimeline(groupArray, eventObjArray, startTime) { +function drawJobTimeline(groupArray, eventObjArray) { var groups = new vis.DataSet(groupArray); var items = new vis.DataSet(eventObjArray); var container = $('#job-timeline')[0]; @@ -59,7 +55,6 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) { }, editable: false, showCurrentTime: false, - showCustomTime: true, zoomable: false, }; @@ -68,10 +63,6 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) { jobTimeline.setGroups(groups); jobTimeline.setItems(items); - if (startTime != -1) { - jobTimeline.setCustomTime(startTime) - } - setupZoomable("#job-timeline-zoom-lock", jobTimeline); $(".item.range.stage.job-timeline-object").each(function() { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 2a4a97cc5def2..3324bb16ef0bb 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -29,8 +29,6 @@ import org.apache.spark.JobExecutionStatus /** Page showing list of all ongoing and recently finished jobs */ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { - private val startTime: Option[Long] = parent.sc.map(_.startTime) - private val listener = parent.listener private val JOBS_LEGEND =
Running Job
.toString.filter(_ != '\n') + private val EXECUTORS_LEGEND = +
+ + Executor Added + + Executor Removed +
.toString.filter(_ != '\n') + private def getlastStageDescription(job: JobUIData) = { val lastStageInfo = Option(job.stageIds) .filter(_.nonEmpty) - .flatMap { ids => listener.stageIdToInfo.get(ids.max)} + .flatMap { ids => parent.listener.stageIdToInfo.get(ids.max)} val lastStageData = lastStageInfo.flatMap { s => - listener.stageIdToData.get((s.stageId, s.attemptId)) + parent.listener.stageIdToData.get((s.stageId, s.attemptId)) } lastStageData.flatMap(_.description).getOrElse("") } - private def makeTimeline(jobs: Seq[JobUIData], executors: Seq[ExecutorUIData]): Seq[Node] = { - - def makeJobEvent(jobUIDatas: Seq[JobUIData]): Seq[String] = { - jobUIDatas.flatMap { jobUIData => - val jobId = jobUIData.jobId - val status = jobUIData.status - val jobDescription = getlastStageDescription(jobUIData) - val submissionTimeOpt = jobUIData.submissionTime - val completionTimeOpt = jobUIData.completionTime - - if (status == JobExecutionStatus.UNKNOWN || submissionTimeOpt.isEmpty || - completionTimeOpt.isEmpty && status != JobExecutionStatus.RUNNING) { - None - } - - val submissionTime = submissionTimeOpt.get - val completionTime = completionTimeOpt.getOrElse(System.currentTimeMillis()) - val classNameByStatus = status match { - case JobExecutionStatus.SUCCEEDED => "succeeded" - case JobExecutionStatus.FAILED => "failed" - case JobExecutionStatus.RUNNING => "running" - } + private def makeJobEvent(jobUIDatas: Seq[JobUIData]): Seq[String] = { + jobUIDatas.filter { jobUIData => + jobUIData.status != JobExecutionStatus.UNKNOWN && jobUIData.submissionTime.isDefined + }.map { jobUIData => + val jobId = jobUIData.jobId + val status = jobUIData.status + val jobDescription = getlastStageDescription(jobUIData) + val submissionTime = jobUIData.submissionTime.get + val completionTimeOpt = jobUIData.completionTime + val completionTime = completionTimeOpt.getOrElse(System.currentTimeMillis()) + val classNameByStatus = status match { + case JobExecutionStatus.SUCCEEDED => "succeeded" + case JobExecutionStatus.FAILED => "failed" + case JobExecutionStatus.RUNNING => "running" + } - val jobEventJsonAsStr = - s""" + val jobEventJsonAsStr = + s""" |{ | 'className': 'job application-timeline-object ${classNameByStatus}', | 'group': 'jobs', @@ -89,25 +90,25 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { | 'title': '${jobDescription} (Job ${jobId})\\nStatus: ${status}\\n' + | 'Submission Time: ${UIUtils.formatDate(new Date(submissionTime))}' + | '${ - if (status != JobExecutionStatus.RUNNING) { - s"""\\nCompletion Time: ${UIUtils.formatDate(new Date(completionTime))}""" - } else { - "" - } - }' + if (status != JobExecutionStatus.RUNNING) { + s"""\\nCompletion Time: ${UIUtils.formatDate(new Date(completionTime))}""" + } else { + "" + } + }' |} """.stripMargin - Some(jobEventJsonAsStr) - } + jobEventJsonAsStr } + } - def makeExecutorEvent(executorUIDatas: Seq[ExecutorUIData]): Seq[String] = { - val events = ListBuffer[String]() - executorUIDatas.foreach { event => + private def makeExecutorEvent(executorUIDatas: Seq[ExecutorUIData]): Seq[String] = { + val events = ListBuffer[String]() + executorUIDatas.foreach { event => - if (event.startTime.isDefined) { - val addedEvent = - s""" + if (event.startTime.isDefined) { + val addedEvent = + s""" |{ | 'className': 'executor added', | 'group': 'executors', @@ -116,12 +117,12 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { | 'title': 'Added at ${UIUtils.formatDate(new Date(event.startTime.get))}' |} """.stripMargin - events += addedEvent - } + events += addedEvent + } - if (event.finishTime.isDefined) { - val removedEvent = - s""" + if (event.finishTime.isDefined) { + val removedEvent = + s""" |{ | 'className': 'executor removed', | 'group': 'executors', @@ -137,31 +138,26 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { }' |} """.stripMargin - events += removedEvent - } + events += removedEvent } - events.toSeq } + events.toSeq + } + + private def makeTimeline( + jobs: Seq[JobUIData], + executors: Seq[ExecutorUIData], + startTime: Long): Seq[Node] = { val jobEventJsonAsStrSeq = makeJobEvent(jobs) val executorEventJsonAsStrSeq = makeExecutorEvent(executors) - val executorsLegend = -
- - Executor Added - - Executor Removed -
.toString.filter(_ != '\n') - val groupJsonArrayAsStr = s""" |[ | { | 'id': 'executors', - | 'content': '
Executors
${executorsLegend}', + | 'content': '
Executors
${EXECUTORS_LEGEND}', | }, | { | 'id': 'jobs', @@ -182,7 +178,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
++ } @@ -201,10 +197,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { def makeRow(job: JobUIData): Seq[Node] = { val lastStageInfo = Option(job.stageIds) .filter(_.nonEmpty) - .flatMap { ids => listener.stageIdToInfo.get(ids.max) } - val lastStageData = lastStageInfo.flatMap { s => - listener.stageIdToData.get((s.stageId, s.attemptId)) - } + .flatMap { ids => parent.listener.stageIdToInfo.get(ids.max) } val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") val lastStageDescription = getlastStageDescription(job) @@ -252,7 +245,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } def render(request: HttpServletRequest): Seq[Node] = { + val listener = parent.listener listener.synchronized { + val startTime = listener.startTime val activeJobs = listener.activeJobs.values.toSeq val completedJobs = listener.completedJobs.reverse.toSeq val failedJobs = listener.failedJobs.reverse.toSeq @@ -271,11 +266,11 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { val summary: NodeSeq =
var content = summary - val appStartTime = content ++=

Events on Application Timeline

++ - makeTimeline(activeJobs ++ completedJobs ++ failedJobs, listener.executors) + makeTimeline(activeJobs ++ completedJobs ++ failedJobs, listener.executors, startTime) if (shouldShowActiveJobs) { content ++=

Active Jobs ({activeJobs.size})

++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index e08a82ca93dba..f45e5f251b850 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -31,7 +31,6 @@ import org.apache.spark.ui.jobs.UIData.ExecutorUIData /** Page showing statistics and stage list for a given job */ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { - private val listener = parent.listener private val STAGES_LEGEND =
Active Stage
.toString.filter(_ != '\n') - private def makeTimeline( - stages: Seq[StageInfo], - executors: Seq[ExecutorUIData], - jobSubmissionTime: Long): Seq[Node] = { + private val EXECUTORS_LEGEND = +
+ + Executor Added + + Executor Removed +
.toString.filter(_ != '\n') - def makeStageEvent(stageInfos: Seq[StageInfo]): Seq[String] = { - stageInfos.map { stage => - val stageId = stage.stageId - val attemptId = stage.attemptId - val name = stage.name - val status = { - if (stage.completionTime.isDefined) { - if (stage.failureReason.isDefined) { - "failed" - } else { - "succeeded" - } + private def makeStageEvent(stageInfos: Seq[StageInfo]): Seq[String] = { + stageInfos.map { stage => + val stageId = stage.stageId + val attemptId = stage.attemptId + val name = stage.name + val status = { + if (stage.completionTime.isDefined) { + if (stage.failureReason.isDefined) { + "failed" } else { - "running" + "succeeded" } + } else { + "running" } + } - val submissionTime = stage.submissionTime.get - val completionTime = stage.completionTime.getOrElse(System.currentTimeMillis()) + val submissionTime = stage.submissionTime.get + val completionTime = stage.completionTime.getOrElse(System.currentTimeMillis()) - s""" + s""" |{ | 'className': 'stage job-timeline-object ${status}', | 'group': 'stages', @@ -81,24 +85,24 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { | 'title': '${name} (Stage ${stageId}.${attemptId})\\nStatus: ${status.toUpperCase}\\n' + | 'Submission Time: ${UIUtils.formatDate(new Date(submissionTime))}' + | '${ - if (status != "running") { - s"""\\nCompletion Time: ${UIUtils.formatDate(new Date(completionTime))}""" - } else { - "" - } - }' + if (status != "running") { + s"""\\nCompletion Time: ${UIUtils.formatDate(new Date(completionTime))}""" + } else { + "" + } + }' |} """.stripMargin - } } + } - def makeExecutorEvent(executorUIDatas: Seq[ExecutorUIData]): Seq[String] = { - val events = ListBuffer[String]() - executorUIDatas.foreach { event => + def makeExecutorEvent(executorUIDatas: Seq[ExecutorUIData]): Seq[String] = { + val events = ListBuffer[String]() + executorUIDatas.foreach { event => - if (event.startTime.isDefined) { - val addedEvent = - s""" + if (event.startTime.isDefined) { + val addedEvent = + s""" |{ | 'className': 'executor added', | 'group': 'executors', @@ -107,12 +111,12 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { | 'title': 'Added at ${UIUtils.formatDate(new Date(event.startTime.get))}' |} """.stripMargin - events += addedEvent - } + events += addedEvent + } - if (event.finishTime.isDefined) { - val removedEvent = - s""" + if (event.finishTime.isDefined) { + val removedEvent = + s""" |{ | 'className': 'executor removed', | 'group': 'executors', @@ -128,31 +132,22 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { }' |} """.stripMargin - events += removedEvent - } + events += removedEvent } - events.toSeq } + events.toSeq + } + private def makeTimeline(stages: Seq[StageInfo], executors: Seq[ExecutorUIData]): Seq[Node] = { val stageEventJsonAsStrSeq = makeStageEvent(stages) val executorsJsonAsStrSeq = makeExecutorEvent(executors) - val executorsLegend = -
- - Executor Added - - Executor Removed -
.toString.filter(_ != '\n') - val groupJsonArrayAsStr = s""" |[ | { | 'id': 'executors', - | 'content': '
Executors
${executorsLegend}', + | 'content': '
Executors
${EXECUTORS_LEGEND}', | }, | { | 'id': 'stages', @@ -172,11 +167,13 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { ++
++ } def render(request: HttpServletRequest): Seq[Node] = { + val listener = parent.listener + listener.synchronized { val parameterId = request.getParameter("id") require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") @@ -299,11 +296,9 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { - val jobSubmissionTime = jobData.submissionTime.getOrElse(-1L) var content = summary content ++=

Events on Job Timeline

++ - makeTimeline(activeStages ++ completedStages ++ failedStages, - listener.executors, jobSubmissionTime) + makeTimeline(activeStages ++ completedStages ++ failedStages, listener.executors) if (shouldShowActiveStages) { content ++=

Active Stages ({activeStages.size})

++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 90dedfc6017fe..e4baf6c39aac7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable.{HashMap, HashSet, ListBuffer, TreeSet} +import scala.collection.mutable.{HashMap, HashSet, ListBuffer} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi @@ -50,6 +50,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { type PoolName = String type ExecutorId = String + // Applicatin: + @volatile var startTime = -1L + // Jobs: val activeJobs = new HashMap[JobId, JobUIData] val completedJobs = ListBuffer[JobUIData]() @@ -555,6 +558,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { trimExecutorsIfNecessary(executors) } } + + override def onApplicationStart(appStarted: SparkListenerApplicationStart) { + startTime = appStarted.time + } } private object JobProgressListener {