diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c11eb3ffa4601..6593aab33f6df 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -107,20 +107,20 @@ private[spark] object JsonProtocol { def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = { val stageInfo = stageInfoToJson(stageSubmitted.stageInfo) val properties = propertiesToJson(stageSubmitted.properties) - ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~ ("Stage Info" -> stageInfo) ~ ("Properties" -> properties) } def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = { val stageInfo = stageInfoToJson(stageCompleted.stageInfo) - ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~ ("Stage Info" -> stageInfo) } def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = { val taskInfo = taskStart.taskInfo - ("Event" -> Utils.getFormattedClassName(taskStart)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~ ("Stage ID" -> taskStart.stageId) ~ ("Stage Attempt ID" -> taskStart.stageAttemptId) ~ ("Task Info" -> taskInfoToJson(taskInfo)) @@ -128,7 +128,7 @@ private[spark] object JsonProtocol { def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = { val taskInfo = taskGettingResult.taskInfo - ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~ ("Task Info" -> taskInfoToJson(taskInfo)) } @@ -137,7 +137,7 @@ private[spark] object JsonProtocol { val taskInfo = taskEnd.taskInfo val taskMetrics = taskEnd.taskMetrics val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing - ("Event" -> Utils.getFormattedClassName(taskEnd)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~ ("Stage ID" -> taskEnd.stageId) ~ ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~ ("Task Type" -> taskEnd.taskType) ~ @@ -148,7 +148,7 @@ private[spark] object JsonProtocol { def jobStartToJson(jobStart: SparkListenerJobStart): JValue = { val properties = propertiesToJson(jobStart.properties) - ("Event" -> Utils.getFormattedClassName(jobStart)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~ ("Job ID" -> jobStart.jobId) ~ ("Submission Time" -> jobStart.time) ~ ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0 @@ -158,7 +158,7 @@ private[spark] object JsonProtocol { def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = { val jobResult = jobResultToJson(jobEnd.jobResult) - ("Event" -> Utils.getFormattedClassName(jobEnd)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobEnd) ~ ("Job ID" -> jobEnd.jobId) ~ ("Completion Time" -> jobEnd.time) ~ ("Job Result" -> jobResult) @@ -170,7 +170,7 @@ private[spark] object JsonProtocol { val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap) val systemProperties = mapToJson(environmentDetails("System Properties").toMap) val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap) - ("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.environmentUpdate) ~ ("JVM Information" -> jvmInformation) ~ ("Spark Properties" -> sparkProperties) ~ ("System Properties" -> systemProperties) ~ @@ -179,7 +179,7 @@ private[spark] object JsonProtocol { def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = { val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId) - ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~ ("Block Manager ID" -> blockManagerId) ~ ("Maximum Memory" -> blockManagerAdded.maxMem) ~ ("Timestamp" -> blockManagerAdded.time) @@ -187,18 +187,18 @@ private[spark] object JsonProtocol { def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = { val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId) - ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerRemoved) ~ ("Block Manager ID" -> blockManagerId) ~ ("Timestamp" -> blockManagerRemoved.time) } def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = { - ("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.unpersistRDD) ~ ("RDD ID" -> unpersistRDD.rddId) } def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = { - ("Event" -> Utils.getFormattedClassName(applicationStart)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart) ~ ("App Name" -> applicationStart.appName) ~ ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ ("Timestamp" -> applicationStart.time) ~ @@ -208,33 +208,33 @@ private[spark] object JsonProtocol { } def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { - ("Event" -> Utils.getFormattedClassName(applicationEnd)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd) ~ ("Timestamp" -> applicationEnd.time) } def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = { - ("Event" -> Utils.getFormattedClassName(executorAdded)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded) ~ ("Timestamp" -> executorAdded.time) ~ ("Executor ID" -> executorAdded.executorId) ~ ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo)) } def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = { - ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorRemoved) ~ ("Timestamp" -> executorRemoved.time) ~ ("Executor ID" -> executorRemoved.executorId) ~ ("Removed Reason" -> executorRemoved.reason) } def logStartToJson(logStart: SparkListenerLogStart): JValue = { - ("Event" -> Utils.getFormattedClassName(logStart)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.logStart) ~ ("Spark Version" -> SPARK_VERSION) } def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = { val execId = metricsUpdate.execId val accumUpdates = metricsUpdate.accumUpdates - ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~ + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~ ("Executor ID" -> execId) ~ ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) => ("Task ID" -> taskId) ~ @@ -485,7 +485,7 @@ private[spark] object JsonProtocol { * JSON deserialization methods for SparkListenerEvents | * ---------------------------------------------------- */ - def sparkEventFromJson(json: JValue): SparkListenerEvent = { + private object SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES { val stageSubmitted = Utils.getFormattedClassName(SparkListenerStageSubmitted) val stageCompleted = Utils.getFormattedClassName(SparkListenerStageCompleted) val taskStart = Utils.getFormattedClassName(SparkListenerTaskStart) @@ -503,6 +503,10 @@ private[spark] object JsonProtocol { val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved) val logStart = Utils.getFormattedClassName(SparkListenerLogStart) val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate) + } + + def sparkEventFromJson(json: JValue): SparkListenerEvent = { + import SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES._ (json \ "Event").extract[String] match { case `stageSubmitted` => stageSubmittedFromJson(json) @@ -540,7 +544,8 @@ private[spark] object JsonProtocol { def taskStartFromJson(json: JValue): SparkListenerTaskStart = { val stageId = (json \ "Stage ID").extract[Int] - val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) + val stageAttemptId = + Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0) val taskInfo = taskInfoFromJson(json \ "Task Info") SparkListenerTaskStart(stageId, stageAttemptId, taskInfo) } @@ -552,7 +557,8 @@ private[spark] object JsonProtocol { def taskEndFromJson(json: JValue): SparkListenerTaskEnd = { val stageId = (json \ "Stage ID").extract[Int] - val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) + val stageAttemptId = + Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0) val taskType = (json \ "Task Type").extract[String] val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason") val taskInfo = taskInfoFromJson(json \ "Task Info") @@ -662,20 +668,22 @@ private[spark] object JsonProtocol { def stageInfoFromJson(json: JValue): StageInfo = { val stageId = (json \ "Stage ID").extract[Int] - val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) + val attemptId = Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0) val stageName = (json \ "Stage Name").extract[String] val numTasks = (json \ "Number of Tasks").extract[Int] val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson) val parentIds = Utils.jsonOption(json \ "Parent IDs") .map { l => l.extract[List[JValue]].map(_.extract[Int]) } .getOrElse(Seq.empty) - val details = (json \ "Details").extractOpt[String].getOrElse("") + val details = Utils.jsonOption(json \ "Details").map(_.extract[String]).getOrElse("") val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]) val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]) val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String]) - val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match { - case Some(values) => values.map(accumulableInfoFromJson) - case None => Seq[AccumulableInfo]() + val accumulatedValues = { + Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match { + case Some(values) => values.map(accumulableInfoFromJson) + case None => Seq[AccumulableInfo]() + } } val stageInfo = new StageInfo( @@ -692,17 +700,17 @@ private[spark] object JsonProtocol { def taskInfoFromJson(json: JValue): TaskInfo = { val taskId = (json \ "Task ID").extract[Long] val index = (json \ "Index").extract[Int] - val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1) + val attempt = Utils.jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1) val launchTime = (json \ "Launch Time").extract[Long] val executorId = (json \ "Executor ID").extract[String] val host = (json \ "Host").extract[String] val taskLocality = TaskLocality.withName((json \ "Locality").extract[String]) - val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false) + val speculative = Utils.jsonOption(json \ "Speculative").exists(_.extract[Boolean]) val gettingResultTime = (json \ "Getting Result Time").extract[Long] val finishTime = (json \ "Finish Time").extract[Long] val failed = (json \ "Failed").extract[Boolean] - val killed = (json \ "Killed").extractOpt[Boolean].getOrElse(false) - val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match { + val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean]) + val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match { case Some(values) => values.map(accumulableInfoFromJson) case None => Seq[AccumulableInfo]() } @@ -719,12 +727,13 @@ private[spark] object JsonProtocol { def accumulableInfoFromJson(json: JValue): AccumulableInfo = { val id = (json \ "ID").extract[Long] - val name = (json \ "Name").extractOpt[String] + val name = Utils.jsonOption(json \ "Name").map(_.extract[String]) val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) } val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) } - val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false) - val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false) - val metadata = (json \ "Metadata").extractOpt[String] + val internal = Utils.jsonOption(json \ "Internal").exists(_.extract[Boolean]) + val countFailedValues = + Utils.jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean]) + val metadata = Utils.jsonOption(json \ "Metadata").map(_.extract[String]) new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata) } @@ -782,9 +791,11 @@ private[spark] object JsonProtocol { readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int]) readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int]) readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long]) - readMetrics.incLocalBytesRead((readJson \ "Local Bytes Read").extractOpt[Long].getOrElse(0L)) + readMetrics.incLocalBytesRead( + Utils.jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L)) readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long]) - readMetrics.incRecordsRead((readJson \ "Total Records Read").extractOpt[Long].getOrElse(0L)) + readMetrics.incRecordsRead( + Utils.jsonOption(readJson \ "Total Records Read").map(_.extract[Long]).getOrElse(0L)) metrics.mergeShuffleReadMetrics() } @@ -793,8 +804,8 @@ private[spark] object JsonProtocol { Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson => val writeMetrics = metrics.shuffleWriteMetrics writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long]) - writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written") - .extractOpt[Long].getOrElse(0L)) + writeMetrics.incRecordsWritten( + Utils.jsonOption(writeJson \ "Shuffle Records Written").map(_.extract[Long]).getOrElse(0L)) writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long]) } @@ -802,14 +813,16 @@ private[spark] object JsonProtocol { Utils.jsonOption(json \ "Output Metrics").foreach { outJson => val outputMetrics = metrics.outputMetrics outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long]) - outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L)) + outputMetrics.setRecordsWritten( + Utils.jsonOption(outJson \ "Records Written").map(_.extract[Long]).getOrElse(0L)) } // Input metrics Utils.jsonOption(json \ "Input Metrics").foreach { inJson => val inputMetrics = metrics.inputMetrics inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long]) - inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L)) + inputMetrics.incRecordsRead( + Utils.jsonOption(inJson \ "Records Read").map(_.extract[Long]).getOrElse(0L)) } // Updated blocks @@ -824,7 +837,7 @@ private[spark] object JsonProtocol { metrics } - def taskEndReasonFromJson(json: JValue): TaskEndReason = { + private object TASK_END_REASON_FORMATTED_CLASS_NAMES { val success = Utils.getFormattedClassName(Success) val resubmitted = Utils.getFormattedClassName(Resubmitted) val fetchFailed = Utils.getFormattedClassName(FetchFailed) @@ -834,6 +847,10 @@ private[spark] object JsonProtocol { val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied) val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure) val unknownReason = Utils.getFormattedClassName(UnknownReason) + } + + def taskEndReasonFromJson(json: JValue): TaskEndReason = { + import TASK_END_REASON_FORMATTED_CLASS_NAMES._ (json \ "Reason").extract[String] match { case `success` => Success @@ -850,7 +867,8 @@ private[spark] object JsonProtocol { val className = (json \ "Class Name").extract[String] val description = (json \ "Description").extract[String] val stackTrace = stackTraceFromJson(json \ "Stack Trace") - val fullStackTrace = (json \ "Full Stack Trace").extractOpt[String].orNull + val fullStackTrace = + Utils.jsonOption(json \ "Full Stack Trace").map(_.extract[String]).orNull // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates") .map(_.extract[List[JValue]].map(accumulableInfoFromJson)) @@ -891,9 +909,13 @@ private[spark] object JsonProtocol { BlockManagerId(executorId, host, port) } - def jobResultFromJson(json: JValue): JobResult = { + private object JOB_RESULT_FORMATTED_CLASS_NAMES { val jobSucceeded = Utils.getFormattedClassName(JobSucceeded) val jobFailed = Utils.getFormattedClassName(JobFailed) + } + + def jobResultFromJson(json: JValue): JobResult = { + import JOB_RESULT_FORMATTED_CLASS_NAMES._ (json \ "Result").extract[String] match { case `jobSucceeded` => JobSucceeded diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 81d57d723a720..48333851efb54 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -217,6 +217,12 @@ This file is divided into 3 sections: of Commons Lang 2 (package org.apache.commons.lang.*) + + extractOpt + Use Utils.jsonOption(x).map(.extract[T]) instead of .extractOpt[T], as the latter + is slower. + + java,scala,3rdParty,spark