Skip to content

Commit

Permalink
General refactor
Browse files Browse the repository at this point in the history
This includes reverting previous formatting and naming changes that are irrelevant to
this patch.
  • Loading branch information
andrewor14 committed Feb 17, 2014
1 parent 4dfcd22 commit f3fc13b
Show file tree
Hide file tree
Showing 22 changed files with 254 additions and 256 deletions.
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import net.liftweb.json.DefaultFormats
* tasks several times for "ephemeral" failures, and only report back failures that require some
* old stages to be resubmitted, such as shuffle map fetch failures.
*/
sealed trait TaskEndReason extends JsonSerializable {
private[spark] sealed trait TaskEndReason extends JsonSerializable {
override def toJson = "Reason" -> Utils.getFormattedClassName(this)
}

case object TaskEndReason {
private[spark] case object TaskEndReason {
def fromJson(json: JValue): TaskEndReason = {
implicit val format = DefaultFormats
val success = Utils.getFormattedClassName(Success)
Expand Down Expand Up @@ -84,12 +84,12 @@ case object TaskEndReason {
}
}

case object Success extends TaskEndReason
private[spark] case object Success extends TaskEndReason

// Task was finished earlier but we've now lost it
case object Resubmitted extends TaskEndReason
private[spark] case object Resubmitted extends TaskEndReason

case class FetchFailed(
private[spark] case class FetchFailed(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
Expand All @@ -104,7 +104,7 @@ case class FetchFailed(
}
}

case class ExceptionFailure(
private[spark] case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
Expand All @@ -125,18 +125,18 @@ case class ExceptionFailure(
* The task finished successfully, but the result was lost from the executor's block manager before
* it was fetched.
*/
case object TaskResultLost extends TaskEndReason
private[spark] case object TaskResultLost extends TaskEndReason

case object TaskKilled extends TaskEndReason
private[spark] case object TaskKilled extends TaskEndReason

/**
* The task failed because the executor that it was running on was lost. This may happen because
* the task crashed the JVM.
*/
case object ExecutorLostFailure extends TaskEndReason
private[spark] case object ExecutorLostFailure extends TaskEndReason

/**
* We don't know why the task ended -- for example, because of a ClassNotFound exception when
* deserializing the task result.
*/
case object UnknownReason extends TaskEndReason
private[spark] case object UnknownReason extends TaskEndReason
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class TaskMetrics extends Serializable with JsonSerializable {
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None

/**
* If this task writes to shuffle output, metrics on the written shuffle data will be
* collected here
* If this task writes to shuffle output, metrics on the written shuffle data will be collected
* here
*/
var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.{Utils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}

/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
Expand Down Expand Up @@ -596,7 +596,7 @@ class DAGScheduler(

case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
val stageId = task.stageId
val taskType = task.getClass.getSimpleName
val taskType = Utils.getFormattedClassName(task)
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion)

Expand Down Expand Up @@ -826,7 +826,7 @@ class DAGScheduler(
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
}
pendingTasks(stage) -= task
stageToInfos(stage).taskInfo += event.taskInfo -> event.taskMetrics
stageToInfos(stage).taskInfos += event.taskInfo -> event.taskMetrics
task match {
case rt: ResultTask[_, _] =>
resultStageToJob.get(stage) match {
Expand Down
114 changes: 57 additions & 57 deletions core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.executor.TaskMetrics
* is created. Note that each JobLogger only works for one SparkContext
*/

class JobLogger(user: String, logDirName: String)
class JobLogger(val user: String, val logDirName: String)
extends SparkListener with Logging {

def this() = this(System.getProperty("user.name", "<unknown>"),
Expand All @@ -47,19 +47,19 @@ class JobLogger(user: String, logDirName: String)
"/tmp/spark-%s".format(user)
}

private val jobIdToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIdToJobId = new HashMap[Int, Int]
private val jobIdToStageIds = new HashMap[Int, Seq[Int]]
private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIDToJobID = new HashMap[Int, Int]
private val jobIDToStageIDs = new HashMap[Int, Seq[Int]]
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]

createLogDir()

// The following 5 functions are used only in testing.
private[scheduler] def getLogDir = logDir
private[scheduler] def getJobIdToPrintWriter = jobIdToPrintWriter
private[scheduler] def getStageIdToJobId = stageIdToJobId
private[scheduler] def getJobIdToStageIds = jobIdToStageIds
private[scheduler] def getJobIDToPrintWriter = jobIDToPrintWriter
private[scheduler] def getStageIDToJobID = stageIDToJobID
private[scheduler] def getJobIDToStageIDs = jobIDToStageIDs
private[scheduler] def getEventQueue = eventQueue

/** Create a folder for log files, the folder's name is the creation time of jobLogger */
Expand All @@ -76,90 +76,90 @@ class JobLogger(user: String, logDirName: String)

/**
* Create a log file for one job
* @param jobId ID of the job
* @param jobID ID of the job
* @exception FileNotFoundException Fail to create log file
*/
protected def createLogWriter(jobId: Int) {
protected def createLogWriter(jobID: Int) {
try {
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobId)
jobIdToPrintWriter += (jobId -> fileWriter)
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
jobIDToPrintWriter += (jobID -> fileWriter)
} catch {
case e: FileNotFoundException => e.printStackTrace()
}
}

/**
* Close log file, and clean the stage relationship in stageIdToJobId
* @param jobId ID of the job
* Close log file, and clean the stage relationship in stageIDToJobID
* @param jobID ID of the job
*/
protected def closeLogWriter(jobId: Int) {
jobIdToPrintWriter.get(jobId).foreach { fileWriter =>
protected def closeLogWriter(jobID: Int) {
jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
fileWriter.close()
jobIdToStageIds.get(jobId).foreach(_.foreach{ stageId =>
stageIdToJobId -= stageId
jobIDToStageIDs.get(jobID).foreach(_.foreach{ stageID =>
stageIDToJobID -= stageID
})
jobIdToPrintWriter -= jobId
jobIdToStageIds -= jobId
jobIDToPrintWriter -= jobID
jobIDToStageIDs -= jobID
}
}

/**
* Build up the maps that represent stage-job relationships
* @param jobId ID of the job
* @param stageIds IDs of the associated stages
* @param jobID ID of the job
* @param stageIDs IDs of the associated stages
*/
protected def buildJobStageDependencies(jobId: Int, stageIds: Seq[Int]) = {
jobIdToStageIds(jobId) = stageIds
stageIds.foreach { stageId => stageIdToJobId(stageId) = jobId }
protected def buildJobStageDependencies(jobID: Int, stageIDs: Seq[Int]) = {
jobIDToStageIDs(jobID) = stageIDs
stageIDs.foreach { stageID => stageIDToJobID(stageID) = jobID }
}

/**
* Write info into log file
* @param jobId ID of the job
* @param jobID ID of the job
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
*/
protected def jobLogInfo(jobId: Int, info: String, withTime: Boolean = true) {
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
var writeInfo = info
if (withTime) {
val date = new Date(System.currentTimeMillis())
writeInfo = DATE_FORMAT.format(date) + ": " + info
}
jobIdToPrintWriter.get(jobId).foreach(_.println(writeInfo))
jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
}

/**
* Write info into log file
* @param stageId ID of the stage
* @param stageID ID of the stage
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
*/
protected def stageLogInfo(stageId: Int, info: String, withTime: Boolean = true) {
stageIdToJobId.get(stageId).foreach(jobId => jobLogInfo(jobId, info, withTime))
protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
}

/**
* Log job properties into job log file
* @param jobId ID of the job
* @param jobID ID of the job
* @param properties Properties of the job
*/
protected def logJobProperties(jobId: Int, properties: Properties) {
protected def recordJobProperties(jobID: Int, properties: Properties) {
if (properties != null) {
val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
jobLogInfo(jobId, description, false)
jobLogInfo(jobID, description, false)
}
}

/**
* Log task metrics into job log files, including execution info and shuffle metrics
* @param stageId Stage ID of the task
* @param stageID Stage ID of the task
* @param status Status info of the task
* @param taskInfo Task description info
* @param taskMetrics Task running metrics
*/
protected def logTaskMetrics(stageId: Int, status: String,
protected def recordTaskMetrics(stageID: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageId +
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
" START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
Expand All @@ -178,7 +178,7 @@ class JobLogger(user: String, logDirName: String)
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
}

/**
Expand All @@ -196,8 +196,8 @@ class JobLogger(user: String, logDirName: String)
* @param stageCompleted Stage completed event
*/
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageId = stageCompleted.stageInfo.stageId
stageLogInfo(stageId, "STAGE_ID=%d STATUS=COMPLETED".format(stageId))
val stageID = stageCompleted.stageInfo.stageId
stageLogInfo(stageID, "STAGE_ID=%d STATUS=COMPLETED".format(stageID))
}

/**
Expand All @@ -209,7 +209,7 @@ class JobLogger(user: String, logDirName: String)
var taskStatus = taskEnd.taskType
taskEnd.reason match {
case Success => taskStatus += " STATUS=SUCCESS"
logTaskMetrics(taskEnd.stageId, taskStatus, taskInfo, taskEnd.taskMetrics)
recordTaskMetrics(taskEnd.stageId, taskStatus, taskInfo, taskEnd.taskMetrics)
case Resubmitted =>
taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
" STAGE_ID=" + taskEnd.stageId
Expand All @@ -223,34 +223,34 @@ class JobLogger(user: String, logDirName: String)
}
}

/**
* When job starts, record job property and stage graph
* @param jobStart Job start event
*/
override def onJobStart(jobStart: SparkListenerJobStart) {
val jobId = jobStart.jobId
val properties = jobStart.properties
createLogWriter(jobId)
buildJobStageDependencies(jobId, jobStart.stageIds)
logJobProperties(jobId, properties)
jobLogInfo(jobId, "JOB_ID=" + jobId + " STATUS=STARTED")
}

/**
* When job ends, recording job completion status and close log file
* @param jobEnd Job end event
*/
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val jobId = jobEnd.jobId
var info = "JOB_ID=" + jobId
val jobID = jobEnd.jobId
var info = "JOB_ID=" + jobID
jobEnd.jobResult match {
case JobSucceeded => info += " STATUS=SUCCESS"
case JobFailed(exception, _) =>
info += " STATUS=FAILED REASON="
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
}
jobLogInfo(jobId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(jobId)
jobLogInfo(jobID, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(jobID)
}

/**
* When job starts, record job property and stage graph
* @param jobStart Job start event
*/
override def onJobStart(jobStart: SparkListenerJobStart) {
val jobID = jobStart.jobId
val properties = jobStart.properties
createLogWriter(jobID)
buildJobStageDependencies(jobID, jobStart.stageIds)
recordJobProperties(jobID, properties)
jobLogInfo(jobID, "JOB_ID=" + jobID + " STATUS=STARTED")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ private[spark] object JobResult {

private[spark] case object JobSucceeded extends JobResult

private[spark] case class JobFailed(exception: Exception, failedStageId: Int)
extends JobResult {
private[spark] case class JobFailed(exception: Exception, failedStageId: Int) extends JobResult {
override def toJson = {
val exceptionJson = Utils.exceptionToJson(exception)

Expand Down
Loading

0 comments on commit f3fc13b

Please sign in to comment.