Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18256] Improve the performance of event log replay in HistoryServer #15756

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 64 additions & 42 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,28 +107,28 @@ private[spark] object JsonProtocol {
def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = {
val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
val properties = propertiesToJson(stageSubmitted.properties)
("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~
("Stage Info" -> stageInfo) ~
("Properties" -> properties)
}

def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = {
val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
("Event" -> Utils.getFormattedClassName(stageCompleted)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~
("Stage Info" -> stageInfo)
}

def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
val taskInfo = taskStart.taskInfo
("Event" -> Utils.getFormattedClassName(taskStart)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~
("Stage ID" -> taskStart.stageId) ~
("Stage Attempt ID" -> taskStart.stageAttemptId) ~
("Task Info" -> taskInfoToJson(taskInfo))
}

def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
val taskInfo = taskGettingResult.taskInfo
("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~
("Task Info" -> taskInfoToJson(taskInfo))
}

Expand All @@ -137,7 +137,7 @@ private[spark] object JsonProtocol {
val taskInfo = taskEnd.taskInfo
val taskMetrics = taskEnd.taskMetrics
val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
("Event" -> Utils.getFormattedClassName(taskEnd)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~
("Stage ID" -> taskEnd.stageId) ~
("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
("Task Type" -> taskEnd.taskType) ~
Expand All @@ -148,7 +148,7 @@ private[spark] object JsonProtocol {

def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
val properties = propertiesToJson(jobStart.properties)
("Event" -> Utils.getFormattedClassName(jobStart)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~
("Job ID" -> jobStart.jobId) ~
("Submission Time" -> jobStart.time) ~
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0
Expand All @@ -158,7 +158,7 @@ private[spark] object JsonProtocol {

def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
val jobResult = jobResultToJson(jobEnd.jobResult)
("Event" -> Utils.getFormattedClassName(jobEnd)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobEnd) ~
("Job ID" -> jobEnd.jobId) ~
("Completion Time" -> jobEnd.time) ~
("Job Result" -> jobResult)
Expand All @@ -170,7 +170,7 @@ private[spark] object JsonProtocol {
val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap)
val systemProperties = mapToJson(environmentDetails("System Properties").toMap)
val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap)
("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.environmentUpdate) ~
("JVM Information" -> jvmInformation) ~
("Spark Properties" -> sparkProperties) ~
("System Properties" -> systemProperties) ~
Expand All @@ -179,26 +179,26 @@ private[spark] object JsonProtocol {

def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = {
val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~
("Block Manager ID" -> blockManagerId) ~
("Maximum Memory" -> blockManagerAdded.maxMem) ~
("Timestamp" -> blockManagerAdded.time)
}

def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerRemoved) ~
("Block Manager ID" -> blockManagerId) ~
("Timestamp" -> blockManagerRemoved.time)
}

def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.unpersistRDD) ~
("RDD ID" -> unpersistRDD.rddId)
}

def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
("Event" -> Utils.getFormattedClassName(applicationStart)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart) ~
("App Name" -> applicationStart.appName) ~
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
Expand All @@ -208,33 +208,33 @@ private[spark] object JsonProtocol {
}

def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
("Event" -> Utils.getFormattedClassName(applicationEnd)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd) ~
("Timestamp" -> applicationEnd.time)
}

def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
("Event" -> Utils.getFormattedClassName(executorAdded)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded) ~
("Timestamp" -> executorAdded.time) ~
("Executor ID" -> executorAdded.executorId) ~
("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
}

def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorRemoved) ~
("Timestamp" -> executorRemoved.time) ~
("Executor ID" -> executorRemoved.executorId) ~
("Removed Reason" -> executorRemoved.reason)
}

def logStartToJson(logStart: SparkListenerLogStart): JValue = {
("Event" -> Utils.getFormattedClassName(logStart)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.logStart) ~
("Spark Version" -> SPARK_VERSION)
}

def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
val execId = metricsUpdate.execId
val accumUpdates = metricsUpdate.accumUpdates
("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
("Executor ID" -> execId) ~
("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
("Task ID" -> taskId) ~
Expand Down Expand Up @@ -485,7 +485,7 @@ private[spark] object JsonProtocol {
* JSON deserialization methods for SparkListenerEvents |
* ---------------------------------------------------- */

def sparkEventFromJson(json: JValue): SparkListenerEvent = {
private object SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES {
val stageSubmitted = Utils.getFormattedClassName(SparkListenerStageSubmitted)
val stageCompleted = Utils.getFormattedClassName(SparkListenerStageCompleted)
val taskStart = Utils.getFormattedClassName(SparkListenerTaskStart)
Expand All @@ -503,6 +503,10 @@ private[spark] object JsonProtocol {
val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
}

def sparkEventFromJson(json: JValue): SparkListenerEvent = {
import SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES._

(json \ "Event").extract[String] match {
case `stageSubmitted` => stageSubmittedFromJson(json)
Expand Down Expand Up @@ -540,7 +544,8 @@ private[spark] object JsonProtocol {

def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
val stageId = (json \ "Stage ID").extract[Int]
val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
val stageAttemptId =
Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference? Is extractOpt really slow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if so maybe we should add a scalastyle to ban it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait you already did 😄

val taskInfo = taskInfoFromJson(json \ "Task Info")
SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
}
Expand All @@ -552,7 +557,8 @@ private[spark] object JsonProtocol {

def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
val stageId = (json \ "Stage ID").extract[Int]
val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
val stageAttemptId =
Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
val taskType = (json \ "Task Type").extract[String]
val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
val taskInfo = taskInfoFromJson(json \ "Task Info")
Expand Down Expand Up @@ -662,20 +668,22 @@ private[spark] object JsonProtocol {

def stageInfoFromJson(json: JValue): StageInfo = {
val stageId = (json \ "Stage ID").extract[Int]
val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
val attemptId = Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
val parentIds = Utils.jsonOption(json \ "Parent IDs")
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
.getOrElse(Seq.empty)
val details = (json \ "Details").extractOpt[String].getOrElse("")
val details = Utils.jsonOption(json \ "Details").map(_.extract[String]).getOrElse("")
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match {
case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
val accumulatedValues = {
Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
}
}

val stageInfo = new StageInfo(
Expand All @@ -692,17 +700,17 @@ private[spark] object JsonProtocol {
def taskInfoFromJson(json: JValue): TaskInfo = {
val taskId = (json \ "Task ID").extract[Long]
val index = (json \ "Index").extract[Int]
val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
val attempt = Utils.jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
val launchTime = (json \ "Launch Time").extract[Long]
val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
val speculative = Utils.jsonOption(json \ "Speculative").exists(_.extract[Boolean])
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
val killed = (json \ "Killed").extractOpt[Boolean].getOrElse(false)
val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match {
val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean])
val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
}
Expand All @@ -719,12 +727,13 @@ private[spark] object JsonProtocol {

def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
val id = (json \ "ID").extract[Long]
val name = (json \ "Name").extractOpt[String]
val name = Utils.jsonOption(json \ "Name").map(_.extract[String])
val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false)
val metadata = (json \ "Metadata").extractOpt[String]
val internal = Utils.jsonOption(json \ "Internal").exists(_.extract[Boolean])
val countFailedValues =
Utils.jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean])
val metadata = Utils.jsonOption(json \ "Metadata").map(_.extract[String])
new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata)
}

Expand Down Expand Up @@ -782,9 +791,11 @@ private[spark] object JsonProtocol {
readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
readMetrics.incLocalBytesRead((readJson \ "Local Bytes Read").extractOpt[Long].getOrElse(0L))
readMetrics.incLocalBytesRead(
Utils.jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
readMetrics.incRecordsRead((readJson \ "Total Records Read").extractOpt[Long].getOrElse(0L))
readMetrics.incRecordsRead(
Utils.jsonOption(readJson \ "Total Records Read").map(_.extract[Long]).getOrElse(0L))
metrics.mergeShuffleReadMetrics()
}

Expand All @@ -793,23 +804,25 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
val writeMetrics = metrics.shuffleWriteMetrics
writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written")
.extractOpt[Long].getOrElse(0L))
writeMetrics.incRecordsWritten(
Utils.jsonOption(writeJson \ "Shuffle Records Written").map(_.extract[Long]).getOrElse(0L))
writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
}

// Output metrics
Utils.jsonOption(json \ "Output Metrics").foreach { outJson =>
val outputMetrics = metrics.outputMetrics
outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L))
outputMetrics.setRecordsWritten(
Utils.jsonOption(outJson \ "Records Written").map(_.extract[Long]).getOrElse(0L))
}

// Input metrics
Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
val inputMetrics = metrics.inputMetrics
inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
inputMetrics.incRecordsRead(
Utils.jsonOption(inJson \ "Records Read").map(_.extract[Long]).getOrElse(0L))
}

// Updated blocks
Expand All @@ -824,7 +837,7 @@ private[spark] object JsonProtocol {
metrics
}

def taskEndReasonFromJson(json: JValue): TaskEndReason = {
private object TASK_END_REASON_FORMATTED_CLASS_NAMES {
val success = Utils.getFormattedClassName(Success)
val resubmitted = Utils.getFormattedClassName(Resubmitted)
val fetchFailed = Utils.getFormattedClassName(FetchFailed)
Expand All @@ -834,6 +847,10 @@ private[spark] object JsonProtocol {
val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
val unknownReason = Utils.getFormattedClassName(UnknownReason)
}

def taskEndReasonFromJson(json: JValue): TaskEndReason = {
import TASK_END_REASON_FORMATTED_CLASS_NAMES._

(json \ "Reason").extract[String] match {
case `success` => Success
Expand All @@ -850,7 +867,8 @@ private[spark] object JsonProtocol {
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
val fullStackTrace = (json \ "Full Stack Trace").extractOpt[String].orNull
val fullStackTrace =
Utils.jsonOption(json \ "Full Stack Trace").map(_.extract[String]).orNull
// Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
.map(_.extract[List[JValue]].map(accumulableInfoFromJson))
Expand Down Expand Up @@ -891,9 +909,13 @@ private[spark] object JsonProtocol {
BlockManagerId(executorId, host, port)
}

def jobResultFromJson(json: JValue): JobResult = {
private object JOB_RESULT_FORMATTED_CLASS_NAMES {
val jobSucceeded = Utils.getFormattedClassName(JobSucceeded)
val jobFailed = Utils.getFormattedClassName(JobFailed)
}

def jobResultFromJson(json: JValue): JobResult = {
import JOB_RESULT_FORMATTED_CLASS_NAMES._

(json \ "Result").extract[String] match {
case `jobSucceeded` => JobSucceeded
Expand Down
6 changes: 6 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ This file is divided into 3 sections:
of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
</check>

<check customId="extractopt" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
<parameters><parameter name="regex">extractOpt</parameter></parameters>
<customMessage>Use Utils.jsonOption(x).map(.extract[T]) instead of .extractOpt[T], as the latter
is slower. </customMessage>
</check>

<check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
<parameters>
<parameter name="groups">java,scala,3rdParty,spark</parameter>
Expand Down