Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7305][Streaming][WebUI] Make BatchPage show friendly information when jobs are dropped by SparkListener #5840

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 106 additions & 30 deletions streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui

import javax.servlet.http.HttpServletRequest

import scala.xml.{NodeSeq, Node}
import scala.xml.{NodeSeq, Node, Text}

import org.apache.commons.lang3.StringEscapeUtils

Expand All @@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData

private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])

private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener
Expand All @@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<th>Error</th>
}

private def generateJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
if (sparkJob.jobUIData.isDefined) {
generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
} else {
generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
}
}

/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
*/
def generateJobRow(
private def generateNormalJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
val lastStageInfo = Option(sparkJob.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
Expand All @@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>
<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span>{lastStageName}
{outputOpDescription}
</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
Expand Down Expand Up @@ -122,27 +129,96 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</tr>
}

private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
val sparkjobDurations = sparkJobs.map(sparkJob => {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
/**
* If a job is dropped by sparkListener due to exceeding the limitation, we only show the job id
* with "-" cells.
*/
private def generateDroppedJobRow(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
jobId: Int): Seq[Node] = {
// In the first row, output op id and its information needs to be shown. In other rows, these
// cells will be taken up due to "rowspan".
// scalastyle:off
val prefixCells =
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
Nil
}
})
// scalastyle:on

<tr>
{prefixCells}
<td sorttable_customkey={jobId.toString}>
{jobId.toString}
</td>
<!-- Duration -->
<td>-</td>
<!-- Stages: Succeeded/Total -->
<td>-</td>
<!-- Tasks (for all stages): Succeeded/Total -->
<td>-</td>
<!-- Error -->
<td>-</td>
</tr>
}

private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
// We don't count the durations of dropped jobs
val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
map(sparkJob => {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
})
val formattedOutputOpDuration =
if (sparkjobDurations.exists(_ == None)) {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
// If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
}
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++

val description = generateOutputOpDescription(sparkJobs)

generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
}.flatMap(x => x)
}

private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
val lastStageInfo =
sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
flatMap { sparkJob => // For the first job, get the latest Stage info
if (sparkJob.stageIds.isEmpty) {
None
} else {
sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
}
}
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")

<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span> ++ Text(lastStageName)
}

private def failureReasonCell(failureReason: String): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
Expand Down Expand Up @@ -187,10 +263,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
// Filter out spark Job ids that don't exist in sparkListener
(outputOpId, sparkJobIds.flatMap(getJobData))
(outputOpId,
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}

<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
Expand All @@ -200,7 +276,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tbody>
{
outputOpIdWithJobs.map {
case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
}
}
</tbody>
Expand Down