Skip to content

Commit

Permalink
Serialize events both to and from JSON (rather than just to)
Browse files Browse the repository at this point in the history
This requires every field of every event to be completely reconstructible from its
JSON representation. This commit may contain incomplete state.
  • Loading branch information
andrewor14 committed Feb 15, 2014
1 parent bf0b2e9 commit de8a1cd
Show file tree
Hide file tree
Showing 18 changed files with 713 additions and 386 deletions.
100 changes: 87 additions & 13 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,124 @@ package org.apache.spark

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.JsonSerializable
import org.apache.spark.util.Utils

import net.liftweb.json.JsonDSL._
import net.liftweb.json.JsonAST._
import net.liftweb.json.DefaultFormats

/**
* Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
* tasks several times for "ephemeral" failures, and only report back failures that require some
* old stages to be resubmitted, such as shuffle map fetch failures.
*/
private[spark] sealed trait TaskEndReason
sealed trait TaskEndReason extends JsonSerializable {
override def toJson = "Reason" -> Utils.getFormattedClassName(this)
}

case object TaskEndReason {
def fromJson(json: JValue): TaskEndReason = {
implicit val format = DefaultFormats
val success = Utils.getFormattedClassName(Success)
val resubmitted = Utils.getFormattedClassName(Resubmitted)
val fetchFailed = Utils.getFormattedClassName(FetchFailed)
val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
val taskKilled = Utils.getFormattedClassName(TaskKilled)
val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
val unknownReason = Utils.getFormattedClassName(UnknownReason)

(json \ "Reason").extract[String] match {
case `success` => Success
case `resubmitted` => Resubmitted
case `fetchFailed` => fetchFailedFromJson(json)
case `exceptionFailure` => exceptionFailureFromJson(json)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `executorLostFailure` => ExecutorLostFailure
case `unknownReason` => UnknownReason
}
}

private[spark] case object Success extends TaskEndReason
private def fetchFailedFromJson(json: JValue): TaskEndReason = {
implicit val format = DefaultFormats
new FetchFailed(
BlockManagerId.fromJson(json \ "Block Manager Address"),
(json \ "Shuffle ID").extract[Int],
(json \ "Map ID").extract[Int],
(json \ "Reduce ID").extract[Int])
}

private[spark]
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
private def exceptionFailureFromJson(json: JValue): TaskEndReason = {
implicit val format = DefaultFormats
val metrics = (json \ "Metrics") match {
case JNothing => None
case value: JValue => Some(TaskMetrics.fromJson(value))
}
val stackTrace = Utils.stackTraceFromJson(json \ "Stack Trace")
new ExceptionFailure(
(json \ "Class Name").extract[String],
(json \ "Description").extract[String],
stackTrace,
metrics
)
}
}

private[spark] case class FetchFailed(
case object Success extends TaskEndReason

// Task was finished earlier but we've now lost it
case object Resubmitted extends TaskEndReason

case class FetchFailed(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends TaskEndReason
extends TaskEndReason {
override def toJson = {
super.toJson ~
("Block Manager Address" -> bmAddress.toJson) ~
("Shuffle ID" -> shuffleId) ~
("Map ID" -> mapId) ~
("Reduce ID" -> reduceId)
}
}

private[spark] case class ExceptionFailure(
case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
metrics: Option[TaskMetrics])
extends TaskEndReason
extends TaskEndReason {
override def toJson = {
val stackTraceJson = Utils.stackTraceToJson(stackTrace)
val metricsJson = metrics.map(_.toJson).getOrElse(JNothing)
super.toJson ~
("Class Name" -> className) ~
("Description" -> description) ~
("Stack Trace" -> stackTraceJson) ~
("Metrics" -> metricsJson)
}
}

/**
* The task finished successfully, but the result was lost from the executor's block manager before
* it was fetched.
*/
private[spark] case object TaskResultLost extends TaskEndReason
case object TaskResultLost extends TaskEndReason

private[spark] case object TaskKilled extends TaskEndReason
case object TaskKilled extends TaskEndReason

/**
* The task failed because the executor that it was running on was lost. This may happen because
* the task crashed the JVM.
*/
private[spark] case object ExecutorLostFailure extends TaskEndReason
case object ExecutorLostFailure extends TaskEndReason

/**
* We don't know why the task ended -- for example, because of a ClassNotFound exception when
* deserializing the task result.
*/
private[spark] case object UnknownReason extends TaskEndReason

case object UnknownReason extends TaskEndReason
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,10 @@ private[spark] class Executor(

for (m <- task.metrics) {
m.hostname = Utils.localHostName()
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
m.executorDeserializeTime = taskStart - startTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
m.resultSerializationTime = afterSerialization - beforeSerialization
}

val accumUpdates = Accumulators.values
Expand Down Expand Up @@ -264,7 +264,7 @@ private[spark] class Executor(
}

case t: Throwable => {
val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
m.executorRunTime = serviceTime
Expand Down
61 changes: 55 additions & 6 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.scheduler.JsonSerializable

import net.liftweb.json.JsonDSL._
import net.liftweb.json.JsonAST._
import net.liftweb.json.DefaultFormats

class TaskMetrics extends Serializable with JsonSerializable {
/**
Expand All @@ -31,12 +32,12 @@ class TaskMetrics extends Serializable with JsonSerializable {
/**
* Time taken on the executor to deserialize this task
*/
var executorDeserializeTime: Int = _
var executorDeserializeTime: Long = _

/**
* Time the executor spends actually running the task (including fetching shuffle data)
*/
var executorRunTime: Int = _
var executorRunTime: Long = _

/**
* The number of bytes this task transmitted back to the driver as the TaskResult
Expand Down Expand Up @@ -88,10 +89,6 @@ class TaskMetrics extends Serializable with JsonSerializable {
}
}

object TaskMetrics {
private[spark] def empty(): TaskMetrics = new TaskMetrics
}

class ShuffleReadMetrics extends Serializable with JsonSerializable {
/**
* Absolute time when this task finished reading shuffle data
Expand Down Expand Up @@ -159,3 +156,55 @@ class ShuffleWriteMetrics extends Serializable with JsonSerializable {
("Shuffle Write Time" -> shuffleWriteTime)
}
}

object TaskMetrics {
private[spark] def empty(): TaskMetrics = new TaskMetrics

def fromJson(json: JValue): TaskMetrics = {
implicit val format = DefaultFormats
val metrics = new TaskMetrics
metrics.hostname = (json \ "Host Name").extract[String]
metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
metrics.executorRunTime = (json \ "Executor Run Time").extract[Long]
metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long]
metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
metrics.shuffleReadMetrics =
json \ "Shuffle Read Metrics" match {
case JNothing => None
case value: JValue => Some(ShuffleReadMetrics.fromJson(value))
}
metrics.shuffleWriteMetrics =
json \ "Shuffle Write Metrics" match {
case JNothing => None
case value: JValue => Some(ShuffleWriteMetrics.fromJson(value))
}
metrics
}
}

object ShuffleReadMetrics {
def fromJson(json: JValue): ShuffleReadMetrics = {
implicit val format = DefaultFormats
val metrics = new ShuffleReadMetrics
metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
metrics.totalBlocksFetched = (json \ "Total Blocks Fetched").extract[Int]
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
metrics.remoteFetchTime = (json \ "Remote Fetch Time").extract[Long]
metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long]
metrics
}
}

object ShuffleWriteMetrics {
def fromJson(json: JValue): ShuffleWriteMetrics = {
implicit val format = DefaultFormats
val metrics = new ShuffleWriteMetrics
metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long]
metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long]
metrics
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class DAGScheduler(
new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stageToInfos(stage) = new StageInfo(stage)
stageToInfos(stage) = StageInfo.fromStage(stage)
stage
}

Expand Down Expand Up @@ -826,7 +826,7 @@ class DAGScheduler(
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
}
pendingTasks(stage) -= task
stageToInfos(stage).taskInfos += event.taskInfo -> event.taskMetrics
stageToInfos(stage).taskInfo += event.taskInfo -> event.taskMetrics
task match {
case rt: ResultTask[_, _] =>
resultStageToJob.get(stage) match {
Expand Down
35 changes: 30 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,46 @@
package org.apache.spark.scheduler

import net.liftweb.json.JsonDSL._
import org.apache.spark.util.Utils
import net.liftweb.json.JsonAST.JValue
import net.liftweb.json.DefaultFormats

/**
* A result of a job in the DAGScheduler.
*/
private[spark] sealed trait JobResult extends JsonSerializable
private[spark] sealed trait JobResult extends JsonSerializable {
override def toJson = "Result" -> Utils.getFormattedClassName(this)
}

private[spark] object JobResult {
def fromJson(json: JValue): JobResult = {
implicit val format = DefaultFormats
val jobSucceededString = Utils.getFormattedClassName(JobSucceeded)
val jobFailedString = Utils.getFormattedClassName(JobFailed)

(json \ "Result").extract[String] match {
case `jobSucceededString` => JobSucceeded
case `jobFailedString` => jobFailedFromJson(json)
}
}

private[spark] case object JobSucceeded extends JobResult {
override def toJson = ("Status" -> "Success")
private def jobFailedFromJson(json: JValue): JobResult = {
implicit val format = DefaultFormats
new JobFailed(
Utils.exceptionFromJson(json \ "Exception"),
(json \ "Failed Stage ID").extract[Int])
}
}

private[spark] case object JobSucceeded extends JobResult

private[spark] case class JobFailed(exception: Exception, failedStageId: Int)
extends JobResult {
override def toJson = {
("Status" -> "Failed") ~
("Exception" -> exception.getMessage) ~
val exceptionJson = Utils.exceptionToJson(exception)

super.toJson ~
("Exception" -> exceptionJson) ~
("Failed Stage ID" -> failedStageId)
}
}
Loading

0 comments on commit de8a1cd

Please sign in to comment.