Skip to content

Commit

Permalink
Made submissionTime in SparkListenerJobStartas and completionTime in …
Browse files Browse the repository at this point in the history
…SparkListenerJobEnd as regular Long

Added a test case for checking backward compatibility
  • Loading branch information
sarutak committed Jan 14, 2015
1 parent 0412a6a commit da8bd14
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ class DAGScheduler(
// completion events or stage abort
stageIdToStage -= s.id
jobIdToStageIds -= job.jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, Some(clock.getTime()), jobResult))
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
}
}

Expand Down Expand Up @@ -710,7 +710,7 @@ class DAGScheduler(
stage.latestInfo.stageFailed(stageFailedMessage)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
listenerBus.post(SparkListenerJobEnd(job.jobId, Some(clock.getTime()), JobFailed(error)))
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
}
}

Expand Down Expand Up @@ -749,7 +749,7 @@ class DAGScheduler(
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
val jobSubmissionTime = Some(clock.getTime())
val jobSubmissionTime = clock.getTime()
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(
Expand Down Expand Up @@ -969,7 +969,7 @@ class DAGScheduler(
markStageAsFinished(stage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
SparkListenerJobEnd(job.jobId, Some(clock.getTime()), JobSucceeded))
SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
}

// taskSucceeded runs some user code that might throw an exception. Make sure
Expand Down Expand Up @@ -1238,7 +1238,7 @@ class DAGScheduler(
if (ableToCancelStages) {
job.listener.jobFailed(error)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(SparkListenerJobEnd(job.jobId, Some(clock.getTime()), JobFailed(error)))
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class SparkListenerTaskEnd(
@DeveloperApi
case class SparkListenerJobStart(
jobId: Int,
time: Option[Long],
time: Long,
stageInfos: Seq[StageInfo],
properties: Properties = null)
extends SparkListenerEvent {
Expand All @@ -70,7 +70,7 @@ case class SparkListenerJobStart(
@DeveloperApi
case class SparkListenerJobEnd(
jobId: Int,
time: Option[Long],
time: Long,
jobResult: JobResult)
extends SparkListenerEvent

Expand Down
15 changes: 7 additions & 8 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.xml.{Node, NodeSeq}

import javax.servlet.http.HttpServletRequest

import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData.JobUIData

Expand Down Expand Up @@ -51,13 +50,13 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
job.startTime.map { start =>
val end = job.endTime.getOrElse(System.currentTimeMillis())
job.submissionTime.map { start =>
val end = job.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")
val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
val detailUrl =
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
<tr>
Expand All @@ -68,7 +67,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
<div><em>{lastStageDescription}</em></div>
<a href={detailUrl}>{lastStageName}</a>
</td>
<td sorttable_customkey={job.startTime.getOrElse(-1).toString}>
<td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
{formattedSubmissionTime}
</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
Expand Down Expand Up @@ -101,11 +100,11 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val now = System.currentTimeMillis

val activeJobsTable =
jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)
jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
val completedJobsTable =
jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
val failedJobsTable =
jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)

val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val jobData: JobUIData =
new JobUIData(
jobId = jobStart.jobId,
startTime = jobStart.time,
endTime = None,
submissionTime = Option(jobStart.time).filter(_ >= 0),
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
Expand Down Expand Up @@ -186,7 +185,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
new JobUIData(jobId = jobEnd.jobId)
}
jobData.endTime = jobEnd.time
jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)

jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
Expand Down Expand Up @@ -309,7 +309,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val info = taskEnd.taskInfo
// If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
// compeletion event is for. Let's just drop it here. This means we might have some speculation
// completion event is for. Let's just drop it here. This means we might have some speculation
// tasks on the web ui that's never marked as complete.
if (info != null && taskEnd.stageAttemptId != -1) {
val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ private[jobs] object UIData {

class JobUIData(
var jobId: Int = -1,
var startTime: Option[Long] = None,
var endTime: Option[Long] = None,
var submissionTime: Option[Long] = None,
var completionTime: Option[Long] = None,
var stageIds: Seq[Int] = Seq.empty,
var jobGroup: Option[String] = None,
var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
/* Tasks */
// `numTasks` is a potential underestimate of the true number of tasks that this job will run.
// This may be an underestimate because the job start event references all of the result
// stages's transitive stage dependencies, but some of these stages might be skipped if their
// stages' transitive stage dependencies, but some of these stages might be skipped if their
// output is available from earlier runs.
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
var numTasks: Int = 0,
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.executor._
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark._
import org.apache.hadoop.hdfs.web.JsonUtil

/**
* Serializes SparkListener events to/from JSON. This protocol provides strong backwards-
Expand Down Expand Up @@ -471,7 +472,8 @@ private[spark] object JsonProtocol {

def jobStartFromJson(json: JValue): SparkListenerJobStart = {
val jobId = (json \ "Job ID").extract[Int]
val submissionTime = (json \ "Submission Time").extractOpt[Long]
val submissionTime =
Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
val properties = propertiesFromJson(json \ "Properties")
// The "Stage Infos" field was added in Spark 1.2.0
Expand All @@ -484,7 +486,8 @@ private[spark] object JsonProtocol {

def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
val jobId = (json \ "Job ID").extract[Int]
val completionTime = (json \ "Completion Time").extractOpt[Long]
val completionTime =
Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
val jobResult = jobResultFromJson(json \ "Job Result")
SparkListenerJobEnd(jobId, completionTime, jobResult)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

val jobCompletionTime = Option(1421191296660L)
val jobCompletionTime = 1421191296660L

before {
sc = new SparkContext("local", "SparkListenerSuite")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.util.Utils

class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers {

val jobSubmissionTime = Option(1421191042750L)
val jobCompletionTime = Option(1421191296660L)
val jobSubmissionTime = 1421191042750L
val jobCompletionTime = 1421191296660L

private def createStageStartEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
Expand Down
22 changes: 20 additions & 2 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import org.apache.spark.storage._

class JsonProtocolSuite extends FunSuite {

val jobSubmissionTime = Option(1421191042750L)
val jobCompletionTime = Option(1421191296660L)
val jobSubmissionTime = 1421191042750L
val jobCompletionTime = 1421191296660L

test("SparkListenerEvent") {
val stageSubmitted =
Expand Down Expand Up @@ -250,6 +250,24 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
}

test("SparkListenerJobStart and SparkListenerJobEnd backward compatibility") {
// Prior to Spark 1.3.0, SparkListenerJobStart did not have a "Submission Time" property.
// Also, SparkListenerJobEnd did not have a "Completion Time" property.
val stageIds = Seq[Int](1, 2, 3, 4)
val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40, x * 50))
val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, properties)
val oldStartEvent = JsonProtocol.jobStartToJson(jobStart)
.removeField({ _._1 == "Submission Time"})
val expectedJobStart = SparkListenerJobStart(11, -1, stageInfos, properties)
assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldStartEvent))

val jobEnd = SparkListenerJobEnd(11, jobCompletionTime, JobSucceeded)
val oldEndEvent = JsonProtocol.jobEndToJson(jobEnd)
.removeField({ _._1 == "Completion Time"})
val expectedJobEnd = SparkListenerJobEnd(11, -1, JobSucceeded)
assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
}

/** -------------------------- *
| Helper test running methods |
* --------------------------- */
Expand Down

0 comments on commit da8bd14

Please sign in to comment.