From 2e8a141b5ace2d1c0c89168aa235b0490ec7d763 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 7 May 2015 17:34:44 -0700 Subject: [PATCH] [SPARK-7305] [STREAMING] [WEBUI] Make BatchPage show friendly information when jobs are dropped by SparkListener If jobs are dropped by SparkListener, at least we can show the job ids in BatchPage. Screenshot: ![b1](https://cloud.githubusercontent.com/assets/1000778/7434968/f19aa784-eff3-11e4-8f86-36a073873574.png) Author: zsxwing Closes #5840 from zsxwing/SPARK-7305 and squashes the following commits: aca0ba6 [zsxwing] Fix the code style 718765e [zsxwing] Make generateNormalJobRow private 8073b03 [zsxwing] Merge branch 'master' into SPARK-7305 83dec11 [zsxwing] Make BatchPage show friendly information when jobs are dropped by SparkListener (cherry picked from commit 22ab70e06ede65ca865073fe36c859042a920aa3) Signed-off-by: Tathagata Das --- .../apache/spark/streaming/ui/BatchPage.scala | 136 ++++++++++++++---- 1 file changed, 106 insertions(+), 30 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 3f1cab69068dc..831f60e870f74 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui import javax.servlet.http.HttpServletRequest -import scala.xml.{NodeSeq, Node} +import scala.xml.{NodeSeq, Node, Text} import org.apache.commons.lang3.StringEscapeUtils @@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId} import org.apache.spark.ui.jobs.UIData.JobUIData +private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData]) private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { private val streamingListener = parent.listener @@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { Error } + private def generateJobRow( + outputOpId: OutputOpId, + outputOpDescription: Seq[Node], + formattedOutputOpDuration: String, + numSparkJobRowsInOutputOp: Int, + isFirstRow: Boolean, + sparkJob: SparkJobIdWithUIData): Seq[Node] = { + if (sparkJob.jobUIData.isDefined) { + generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration, + numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get) + } else { + generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration, + numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId) + } + } + /** * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into * one cell, we use "rowspan" for the first row of a output op. */ - def generateJobRow( + private def generateNormalJobRow( outputOpId: OutputOpId, + outputOpDescription: Seq[Node], formattedOutputOpDuration: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, sparkJob: JobUIData): Seq[Node] = { - val lastStageInfo = Option(sparkJob.stageIds) - .filter(_.nonEmpty) - .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) } - val lastStageData = lastStageInfo.flatMap { s => - sparkListener.stageIdToData.get((s.stageId, s.attemptId)) - } - - val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") - val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("") val duration: Option[Long] = { sparkJob.submissionTime.map { start => val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis()) @@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { if (isFirstRow) { {outputOpId.toString} - - {lastStageDescription} - {lastStageName} + {outputOpDescription} {formattedOutputOpDuration} } else { @@ -122,27 +129,96 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } - private def generateOutputOpIdRow( - outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = { - val sparkjobDurations = sparkJobs.map(sparkJob => { - sparkJob.submissionTime.map { start => - val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis()) - end - start + /** + * If a job is dropped by sparkListener due to exceeding the limitation, we only show the job id + * with "-" cells. + */ + private def generateDroppedJobRow( + outputOpId: OutputOpId, + outputOpDescription: Seq[Node], + formattedOutputOpDuration: String, + numSparkJobRowsInOutputOp: Int, + isFirstRow: Boolean, + jobId: Int): Seq[Node] = { + // In the first row, output op id and its information needs to be shown. In other rows, these + // cells will be taken up due to "rowspan". + // scalastyle:off + val prefixCells = + if (isFirstRow) { + {outputOpId.toString} + {outputOpDescription} + {formattedOutputOpDuration} + } else { + Nil } - }) + // scalastyle:on + + + {prefixCells} + + {jobId.toString} + + + - + + - + + - + + - + + } + + private def generateOutputOpIdRow( + outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = { + // We don't count the durations of dropped jobs + val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get). + map(sparkJob => { + sparkJob.submissionTime.map { start => + val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis()) + end - start + } + }) val formattedOutputOpDuration = - if (sparkjobDurations.exists(_ == None)) { - // If any job does not finish, set "formattedOutputOpDuration" to "-" + if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) { + // If no job or any job does not finish, set "formattedOutputOpDuration" to "-" "-" } else { - SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum) + SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum) } - generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++ + + val description = generateOutputOpDescription(sparkJobs) + + generateJobRow( + outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++ sparkJobs.tail.map { sparkJob => - generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob) + generateJobRow( + outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob) }.flatMap(x => x) } + private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = { + val lastStageInfo = + sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData + flatMap { sparkJob => // For the first job, get the latest Stage info + if (sparkJob.stageIds.isEmpty) { + None + } else { + sparkListener.stageIdToInfo.get(sparkJob.stageIds.max) + } + } + val lastStageData = lastStageInfo.flatMap { s => + sparkListener.stageIdToData.get((s.stageId, s.attemptId)) + } + + val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") + val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("") + + + {lastStageDescription} + ++ Text(lastStageName) + } + private def failureReasonCell(failureReason: String): Seq[Node] = { val isMultiline = failureReason.indexOf('\n') >= 0 // Display the first line by default @@ -187,10 +263,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted) } sparkListener.synchronized { - val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = + val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] = outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) => - // Filter out spark Job ids that don't exist in sparkListener - (outputOpId, sparkJobIds.flatMap(getJobData)) + (outputOpId, + sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId)))) } @@ -200,7 +276,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { { outputOpIdWithJobs.map { - case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs) + case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds) } }