Skip to content

Commit

Permalink
[SPARK-7305] [STREAMING] [WEBUI] Make BatchPage show friendly informa…
Browse files Browse the repository at this point in the history
…tion when jobs are dropped by SparkListener

If jobs are dropped by SparkListener, at least we can show the job ids in BatchPage. Screenshot:

![b1](https://cloud.githubusercontent.com/assets/1000778/7434968/f19aa784-eff3-11e4-8f86-36a073873574.png)

Author: zsxwing <[email protected]>

Closes apache#5840 from zsxwing/SPARK-7305 and squashes the following commits:

aca0ba6 [zsxwing] Fix the code style
718765e [zsxwing] Make generateNormalJobRow private
8073b03 [zsxwing] Merge branch 'master' into SPARK-7305
83dec11 [zsxwing] Make BatchPage show friendly information when jobs are dropped by SparkListener
  • Loading branch information
zsxwing authored and tdas committed May 8, 2015
1 parent 88063c6 commit 22ab70e
Showing 1 changed file with 106 additions and 30 deletions.
136 changes: 106 additions & 30 deletions streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui

import javax.servlet.http.HttpServletRequest

import scala.xml.{NodeSeq, Node}
import scala.xml.{NodeSeq, Node, Text}

import org.apache.commons.lang3.StringEscapeUtils

Expand All @@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData

private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])

private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener
Expand All @@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<th>Error</th>
}

private def generateJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
if (sparkJob.jobUIData.isDefined) {
generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
} else {
generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
}
}

/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
*/
def generateJobRow(
private def generateNormalJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
val lastStageInfo = Option(sparkJob.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
Expand All @@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>
<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span>{lastStageName}
{outputOpDescription}
</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
Expand Down Expand Up @@ -122,27 +129,96 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</tr>
}

private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
val sparkjobDurations = sparkJobs.map(sparkJob => {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
/**
* If a job is dropped by sparkListener due to exceeding the limitation, we only show the job id
* with "-" cells.
*/
private def generateDroppedJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
jobId: Int): Seq[Node] = {
// In the first row, output op id and its information needs to be shown. In other rows, these
// cells will be taken up due to "rowspan".
// scalastyle:off
val prefixCells =
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
Nil
}
})
// scalastyle:on

<tr>
{prefixCells}
<td sorttable_customkey={jobId.toString}>
{jobId.toString}
</td>
<!-- Duration -->
<td>-</td>
<!-- Stages: Succeeded/Total -->
<td>-</td>
<!-- Tasks (for all stages): Succeeded/Total -->
<td>-</td>
<!-- Error -->
<td>-</td>
</tr>
}

private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
// We don't count the durations of dropped jobs
val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
map(sparkJob => {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
})
val formattedOutputOpDuration =
if (sparkjobDurations.exists(_ == None)) {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
// If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
}
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++

val description = generateOutputOpDescription(sparkJobs)

generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
}.flatMap(x => x)
}

private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
val lastStageInfo =
sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
flatMap { sparkJob => // For the first job, get the latest Stage info
if (sparkJob.stageIds.isEmpty) {
None
} else {
sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
}
}
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")

<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span> ++ Text(lastStageName)
}

private def failureReasonCell(failureReason: String): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
Expand Down Expand Up @@ -187,10 +263,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
// Filter out spark Job ids that don't exist in sparkListener
(outputOpId, sparkJobIds.flatMap(getJobData))
(outputOpId,
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}

<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
Expand All @@ -200,7 +276,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tbody>
{
outputOpIdWithJobs.map {
case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
}
}
</tbody>
Expand Down

0 comments on commit 22ab70e

Please sign in to comment.