From d05f7a94a852c6bca53ea8f245e0ba156f872144 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 16 Mar 2015 11:35:49 -0500 Subject: [PATCH] dont use case classes for status api POJOs, since they have binary compatibility issues --- .../spark/status/api/v1/AllJobsResource.scala | 2 +- .../spark/status/api/v1/AllRDDResource.scala | 6 +- .../status/api/v1/AllStagesResource.scala | 16 +- .../api/v1/ApplicationListResource.scala | 4 +- .../org/apache/spark/status/api/v1/api.scala | 274 +++++++++--------- .../scala/org/apache/spark/ui/SparkUI.scala | 2 +- 6 files changed, 152 insertions(+), 152 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala index 12db45ef7b13e..efddc8f14df83 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala @@ -72,7 +72,7 @@ object AllJobsResource { } val lastStageName = lastStageInfo.map { _.name }.getOrElse("(Unknown Stage Name)") val lastStageDescription = lastStageData.flatMap { _.description } - JobData( + new JobData( jobId = job.jobId, name = lastStageName, description = lastStageDescription, diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala index 87b289183df69..1c540288c40f9 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala @@ -73,7 +73,7 @@ object AllRDDResource { val dataDistribution = if (includeDetails) { Some(storageStatusList.map { status => - RDDDataDistribution( + new RDDDataDistribution( address = status.blockManagerId.hostPort, memoryUsed = status.memUsedByRdd(rddId), memoryRemaining = status.memRemaining, @@ -84,7 +84,7 @@ object AllRDDResource { } val partitions = if (includeDetails) { Some(blocks.map { case(id, block, locations) => - RDDPartitionInfo( + new RDDPartitionInfo( blockName = id.name, storageLevel = block.storageLevel.description, memoryUsed = block.memSize, @@ -96,7 +96,7 @@ object AllRDDResource { None } - RDDStorageInfo( + new RDDStorageInfo( id = rddId, name = rddInfo.name, numPartitions = rddInfo.numPartitions, diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 0bda37755c009..516d76edb9609 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -72,7 +72,7 @@ object AllStagesResource { } val executorSummary = if(includeDetails) { Some(stageUiData.executorSummary.map { case(k,summary) => - k -> ExecutorStageSummary( + k -> new ExecutorStageSummary( taskTime = summary.taskTime, failedTasks = summary.failedTasks, succeededTasks = summary.succeededTasks, @@ -87,7 +87,7 @@ object AllStagesResource { } else { None } - StageData( + new StageData( status = status, stageId = stageInfo.stageId, numActiveTasks = stageUiData.numActiveTasks, @@ -126,7 +126,7 @@ object AllStagesResource { def convertTaskData(uiData: TaskUIData): TaskData = { - TaskData( + new TaskData( taskId = uiData.taskInfo.taskId, index = uiData.taskInfo.index, attempt = uiData.taskInfo.attempt, @@ -141,7 +141,7 @@ object AllStagesResource { } def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = { - TaskMetrics( + new TaskMetrics( executorDeserializeTime = internal.executorDeserializeTime, executorRunTime = internal.executorRunTime, resultSize = internal.resultSize, @@ -157,21 +157,21 @@ object AllStagesResource { } def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = { - InputMetrics( + new InputMetrics( bytesRead = internal.bytesRead, recordsRead = internal.recordsRead ) } def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = { - OutputMetrics( + new OutputMetrics( bytesWritten = internal.bytesWritten, recordsWritten = internal.recordsWritten ) } def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = { - ShuffleReadMetrics( + new ShuffleReadMetrics( remoteBlocksFetched = internal.remoteBlocksFetched, localBlocksFetched = internal.localBlocksFetched, fetchWaitTime = internal.fetchWaitTime, @@ -182,7 +182,7 @@ object AllStagesResource { } def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = { - ShuffleWriteMetrics( + new ShuffleWriteMetrics( bytesWritten = internal.shuffleBytesWritten, writeTime = internal.shuffleWriteTime, recordsWritten = internal.shuffleRecordsWritten diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 52ba2f31e3975..b82544584f5f9 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -55,7 +55,7 @@ class ApplicationListResource(uiRoot: UIRoot) { object ApplicationsListResource { def appHistoryInfoToPublicAppInfo(app: ApplicationHistoryInfo): ApplicationInfo = { - ApplicationInfo( + new ApplicationInfo( id = app.id, name = app.name, startTime = new Date(app.startTime), @@ -68,7 +68,7 @@ object ApplicationsListResource { def convertApplicationInfo( internal: InternalApplicationInfo, completed: Boolean): ApplicationInfo = { - ApplicationInfo( + new ApplicationInfo( id = internal.id, name = internal.desc.name, startTime = new Date(internal.startTime), diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 9001207ac737b..04c3a19eecdfe 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -23,171 +23,171 @@ import scala.collection.Map import org.apache.spark.JobExecutionStatus import org.apache.spark.status.api.StageStatus -case class ApplicationInfo( - id: String, - name: String, - startTime: Date, - endTime: Date, - sparkUser: String, - completed: Boolean = false +class ApplicationInfo( + val id: String, + val name: String, + val startTime: Date, + val endTime: Date, + val sparkUser: String, + val completed: Boolean = false ) -case class ExecutorStageSummary( - taskTime : Long, - failedTasks : Int, - succeededTasks : Int, - inputBytes : Long, - outputBytes : Long, - shuffleRead : Long, - shuffleWrite : Long, - memoryBytesSpilled : Long, - diskBytesSpilled : Long +class ExecutorStageSummary( + val taskTime : Long, + val failedTasks : Int, + val succeededTasks : Int, + val inputBytes : Long, + val outputBytes : Long, + val shuffleRead : Long, + val shuffleWrite : Long, + val memoryBytesSpilled : Long, + val diskBytesSpilled : Long ) -case class ExecutorSummary( - id: String, - hostPort: String, - rddBlocks: Int, - memoryUsed: Long, - diskUsed: Long, - activeTasks: Int, - failedTasks: Int, - completedTasks: Int, - totalTasks: Int, - totalDuration: Long, - totalInputBytes: Long, - totalShuffleRead: Long, - totalShuffleWrite: Long, - maxMemory: Long, - executorLogs: Map[String, String] +class ExecutorSummary( + val id: String, + val hostPort: String, + val rddBlocks: Int, + val memoryUsed: Long, + val diskUsed: Long, + val activeTasks: Int, + val failedTasks: Int, + val completedTasks: Int, + val totalTasks: Int, + val totalDuration: Long, + val totalInputBytes: Long, + val totalShuffleRead: Long, + val totalShuffleWrite: Long, + val maxMemory: Long, + val executorLogs: Map[String, String] ) -case class JobData( - jobId: Int, - name: String, - description: Option[String], - submissionTime: Option[Date], - completionTime: Option[Date], - stageIds: Seq[Int], - jobGroup: Option[String], - status: JobExecutionStatus, - numTasks: Int, - numActiveTasks: Int, - numCompletedTasks: Int, - numSkippedTasks: Int, - numFailedTasks: Int, - numActiveStages: Int, - numCompletedStages: Int, - numSkippedStages: Int, - numFailedStages: Int +class JobData( + val jobId: Int, + val name: String, + val description: Option[String], + val submissionTime: Option[Date], + val completionTime: Option[Date], + val stageIds: Seq[Int], + val jobGroup: Option[String], + val status: JobExecutionStatus, + val numTasks: Int, + val numActiveTasks: Int, + val numCompletedTasks: Int, + val numSkippedTasks: Int, + val numFailedTasks: Int, + val numActiveStages: Int, + val numCompletedStages: Int, + val numSkippedStages: Int, + val numFailedStages: Int ) // Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage // page ... does anybody pay attention to it? -case class RDDStorageInfo( - id: Int, - name: String, - numPartitions: Int, - numCachedPartitions: Int, - storageLevel: String, - memoryUsed: Long, - diskUsed: Long, - dataDistribution: Option[Seq[RDDDataDistribution]], - partitions: Option[Seq[RDDPartitionInfo]] +class RDDStorageInfo( + val id: Int, + val name: String, + val numPartitions: Int, + val numCachedPartitions: Int, + val storageLevel: String, + val memoryUsed: Long, + val diskUsed: Long, + val dataDistribution: Option[Seq[RDDDataDistribution]], + val partitions: Option[Seq[RDDPartitionInfo]] ) -case class RDDDataDistribution( - address: String, - memoryUsed: Long, - memoryRemaining: Long, - diskUsed: Long +class RDDDataDistribution( + val address: String, + val memoryUsed: Long, + val memoryRemaining: Long, + val diskUsed: Long ) -case class RDDPartitionInfo( - blockName: String, - storageLevel: String, - memoryUsed: Long, - diskUsed: Long, - executors: Seq[String] +class RDDPartitionInfo( + val blockName: String, + val storageLevel: String, + val memoryUsed: Long, + val diskUsed: Long, + val executors: Seq[String] ) -case class StageData( - status: StageStatus, - stageId: Int, - numActiveTasks: Int , - numCompleteTasks: Int, - numFailedTasks: Int, - - executorRunTime: Long, - - inputBytes: Long, - inputRecords: Long, - outputBytes: Long, - outputRecords: Long, - shuffleReadBytes: Long, - shuffleReadRecords: Long, - shuffleWriteBytes: Long, - shuffleWriteRecords: Long, - memoryBytesSpilled: Long, - diskBytesSpilled: Long, - - name: String, - details: String, - schedulingPool: String, +class StageData( + val status: StageStatus, + val stageId: Int, + val numActiveTasks: Int , + val numCompleteTasks: Int, + val numFailedTasks: Int, + + val executorRunTime: Long, + + val inputBytes: Long, + val inputRecords: Long, + val outputBytes: Long, + val outputRecords: Long, + val shuffleReadBytes: Long, + val shuffleReadRecords: Long, + val shuffleWriteBytes: Long, + val shuffleWriteRecords: Long, + val memoryBytesSpilled: Long, + val diskBytesSpilled: Long, + + val name: String, + val details: String, + val schedulingPool: String, //TODO what to do about accumulables? - tasks: Option[Map[Long, TaskData]], - executorSummary:Option[Map[String,ExecutorStageSummary]] + val tasks: Option[Map[Long, TaskData]], + val executorSummary:Option[Map[String,ExecutorStageSummary]] ) -case class TaskData( - taskId: Long, - index: Int, - attempt: Int, - launchTime: Date, - executorId: String, - host: String, - taskLocality: String, - speculative: Boolean, - errorMessage: Option[String] = None, - taskMetrics: Option[TaskMetrics] = None +class TaskData( + val taskId: Long, + val index: Int, + val attempt: Int, + val launchTime: Date, + val executorId: String, + val host: String, + val taskLocality: String, + val speculative: Boolean, + val errorMessage: Option[String] = None, + val taskMetrics: Option[TaskMetrics] = None ) -case class TaskMetrics( - executorDeserializeTime: Long, - executorRunTime: Long, - resultSize: Long, - jvmGcTime: Long, - resultSerializationTime: Long, - memoryBytesSpilled: Long, - diskBytesSpilled: Long, - inputMetrics: Option[InputMetrics], - outputMetrics: Option[OutputMetrics], - shuffleReadMetrics: Option[ShuffleReadMetrics], - shuffleWriteMetrics: Option[ShuffleWriteMetrics] +class TaskMetrics( + val executorDeserializeTime: Long, + val executorRunTime: Long, + val resultSize: Long, + val jvmGcTime: Long, + val resultSerializationTime: Long, + val memoryBytesSpilled: Long, + val diskBytesSpilled: Long, + val inputMetrics: Option[InputMetrics], + val outputMetrics: Option[OutputMetrics], + val shuffleReadMetrics: Option[ShuffleReadMetrics], + val shuffleWriteMetrics: Option[ShuffleWriteMetrics] ) -case class InputMetrics( - bytesRead: Long, - recordsRead: Long +class InputMetrics( + val bytesRead: Long, + val recordsRead: Long ) -case class OutputMetrics( - bytesWritten: Long, - recordsWritten: Long +class OutputMetrics( + val bytesWritten: Long, + val recordsWritten: Long ) -case class ShuffleReadMetrics( - remoteBlocksFetched: Int, - localBlocksFetched: Int, - fetchWaitTime: Long, - remoteBytesRead: Long, - totalBlocksFetched: Int, - recordsRead: Long +class ShuffleReadMetrics( + val remoteBlocksFetched: Int, + val localBlocksFetched: Int, + val fetchWaitTime: Long, + val remoteBytesRead: Long, + val totalBlocksFetched: Int, + val recordsRead: Long ) -case class ShuffleWriteMetrics( - bytesWritten: Long, - writeTime: Long, - recordsWritten: Long +class ShuffleWriteMetrics( + val bytesWritten: Long, + val writeTime: Long, + val recordsWritten: Long ) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 091f55767c9c3..50f84aa3b5533 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -94,7 +94,7 @@ private[spark] class SparkUI private ( } def getApplicationInfoList: Seq[ApplicationInfo] = { - Seq(ApplicationInfo( + Seq(new ApplicationInfo( id = appName, name = appName, //TODO