Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7305][Streaming][WebUI] Make BatchPage show friendly information when jobs are dropped by SparkListener #5840

Closed
wants to merge 4 commits into from
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 107 additions & 30 deletions streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui

import javax.servlet.http.HttpServletRequest

import scala.xml.{NodeSeq, Node}
import scala.xml.{NodeSeq, Node, Text}

import org.apache.commons.lang3.StringEscapeUtils

Expand All @@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData

private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])

private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener
Expand All @@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<th>Error</th>
}

private def generateJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
if (sparkJob.jobUIData.isDefined) {
generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
} else {
generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
}
}

/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
*/
def generateJobRow(
private def generateNormalJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
val lastStageInfo = Option(sparkJob.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
Expand All @@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>
<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span>{lastStageName}
{outputOpDescription}
</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
Expand Down Expand Up @@ -122,27 +129,97 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</tr>
}

private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
val sparkjobDurations = sparkJobs.map(sparkJob => {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
/**
* If a job is dropped by sparkListener due to exceeding the limitation, we only show the job id
* with "-" cells.
*/
private def generateDroppedJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
jobId: Int): Seq[Node] = {
// In the first row, output op id and its information needs to be shown. In other rows, these
// cells will be taken up due to "rowspan".
// scalastyle:off
val prefixCells =
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
Nil
}
})
// scalastyle:on

<tr>
{prefixCells}
<td sorttable_customkey={jobId.toString}>
{jobId.toString}
</td>
<!-- Duration -->
<td>-</td>
<!-- Stages: Succeeded/Total -->
<td>-</td>
<!-- Tasks (for all stages): Succeeded/Total -->
<td>-</td>
<!-- Error -->
<td>-</td>
</tr>
}

private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
// We don't count the durations of dropped jobs
val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
map(sparkJob => {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
})
val formattedOutputOpDuration =
if (sparkjobDurations.exists(_ == None)) {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
// If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
}
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++

val description = generateOutputOpDescription(sparkJobs)

generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
}.flatMap(x => x)
}

private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
val lastStageInfo =
sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
// Get the latest Stage info
flatMap { sparkJob =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is a little weird to read with the continuous line having a line of comment in the middle. took 5 seconds to realize that this is a continuation line.

if (sparkJob.stageIds.isEmpty) {
None
} else {
sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
}
}
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")

<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span> ++ Text(lastStageName)
}

private def failureReasonCell(failureReason: String): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
Expand Down Expand Up @@ -187,10 +264,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
// Filter out spark Job ids that don't exist in sparkListener
(outputOpId, sparkJobIds.flatMap(getJobData))
(outputOpId,
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}

<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
Expand All @@ -200,7 +277,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tbody>
{
outputOpIdWithJobs.map {
case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
}
}
</tbody>
Expand Down