diff --git a/core/src/main/java/org/apache/spark/JobExecutionStatus.java b/core/src/main/java/org/apache/spark/JobExecutionStatus.java index 6571ceeccda7b..6e161313702bb 100644 --- a/core/src/main/java/org/apache/spark/JobExecutionStatus.java +++ b/core/src/main/java/org/apache/spark/JobExecutionStatus.java @@ -17,15 +17,9 @@ package org.apache.spark; -import org.apache.spark.status.api.EnumUtil; - public enum JobExecutionStatus { RUNNING, SUCCEEDED, FAILED, - UNKNOWN; - - public static JobExecutionStatus fromString(String str) { - return EnumUtil.parseIgnoreCase(JobExecutionStatus.class, str); - } + UNKNOWN } diff --git a/core/src/main/java/org/apache/spark/status/api/ApplicationStatus.java b/core/src/main/java/org/apache/spark/status/api/ApplicationStatus.java deleted file mode 100644 index 17b5bb0cae8ef..0000000000000 --- a/core/src/main/java/org/apache/spark/status/api/ApplicationStatus.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api; - -public enum ApplicationStatus { - COMPLETED, - RUNNING; - - public static ApplicationStatus fromString(String str) { - return EnumUtil.parseIgnoreCase(ApplicationStatus.class, str); - } - -} diff --git a/core/src/main/java/org/apache/spark/status/api/EnumUtil.java b/core/src/main/java/org/apache/spark/status/api/EnumUtil.java deleted file mode 100644 index d4c678261cf7c..0000000000000 --- a/core/src/main/java/org/apache/spark/status/api/EnumUtil.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api; - -import com.google.common.base.Joiner; - -import java.util.Arrays; - -public class EnumUtil { - public static > E parseIgnoreCase(Class clz, String str) { - E[] constants = clz.getEnumConstants(); - if (str == null) { - return null; - } - for (E e : constants) { - if (e.name().equalsIgnoreCase(str)) - return e; - } - throw new IllegalArgumentException( - String.format("Illegal type='%s'. Supported type values: %s", - str, Joiner.on(", ").join( - Arrays.asList(constants)))); - } -} diff --git a/core/src/main/java/org/apache/spark/status/api/StageStatus.java b/core/src/main/java/org/apache/spark/status/api/StageStatus.java deleted file mode 100644 index 9cabb93472865..0000000000000 --- a/core/src/main/java/org/apache/spark/status/api/StageStatus.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api; - -public enum StageStatus { - Active, - Complete, - Failed, - Pending; - - public static StageStatus fromString(String str) { - return EnumUtil.parseIgnoreCase(StageStatus.class, str); - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala index 609b35a7e0188..1a50363336182 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala @@ -21,7 +21,6 @@ import java.util.Date import javax.ws.rs._ import javax.ws.rs.core.MediaType -import org.apache.spark.JobExecutionStatus import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.JobProgressListener import org.apache.spark.ui.jobs.UIData.JobUIData @@ -32,14 +31,14 @@ class AllJobsResource(uiRoot: UIRoot) { @GET def jobsList( @PathParam("appId") appId: String, - @QueryParam("status") statuses: java.util.List[JobExecutionStatus] + @QueryParam("status") statuses: java.util.List[JobStatus] ): Seq[JobData] = { uiRoot.withSparkUI(appId) { ui => - val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] = + val statusToJobs: Seq[(JobStatus, Seq[JobUIData])] = AllJobsResource.getStatusToJobs(ui) - val adjStatuses: util.List[JobExecutionStatus] = { + val adjStatuses: util.List[JobStatus] = { if (statuses.isEmpty) { - java.util.Arrays.asList(JobExecutionStatus.values(): _*) + java.util.Arrays.asList(JobStatus.values: _*) } else { statuses @@ -59,12 +58,12 @@ class AllJobsResource(uiRoot: UIRoot) { object AllJobsResource { - def getStatusToJobs(ui: SparkUI): Seq[(JobExecutionStatus, Seq[JobUIData])] = { + def getStatusToJobs(ui: SparkUI): Seq[(JobStatus, Seq[JobUIData])] = { val statusToJobs = ui.jobProgressListener.synchronized { Seq( - JobExecutionStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq, - JobExecutionStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq, - JobExecutionStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq + JobStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq, + JobStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq, + JobStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq ) } statusToJobs @@ -91,7 +90,7 @@ object AllJobsResource { completionTime = job.completionTime.map{new Date(_)}, stageIds = job.stageIds, jobGroup = job.jobGroup, - status = job.status, + status = JobStatus.fromInternalStatus(job.status), numTasks = job.numTasks, numActiveTasks = job.numActiveTasks, numCompletedTasks = job.numCompletedTasks, diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 47644f2591c8b..5f9aa11f6f546 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -22,7 +22,6 @@ import javax.ws.rs.core.MediaType import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics} import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo} -import org.apache.spark.status.api._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData} @@ -39,7 +38,7 @@ class AllStagesResource(uiRoot: UIRoot) { val stageAndStatus = AllStagesResource.stagesAndStatus(ui) val adjStatuses = { if (statuses.isEmpty()) { - java.util.Arrays.asList(StageStatus.values(): _*) + java.util.Arrays.asList(StageStatus.values: _*) } else { statuses } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index b82544584f5f9..c5d88df2d253e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -22,7 +22,6 @@ import javax.ws.rs.core.MediaType import org.apache.spark.deploy.history.ApplicationHistoryInfo import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo} -import org.apache.spark.status.api.ApplicationStatus @Produces(Array(MediaType.APPLICATION_JSON)) class ApplicationListResource(uiRoot: UIRoot) { @@ -36,7 +35,7 @@ class ApplicationListResource(uiRoot: UIRoot) { val allApps = uiRoot.getApplicationInfoList val adjStatus = { if (status.isEmpty) { - java.util.Arrays.asList(ApplicationStatus.values(): _*) + java.util.Arrays.asList(ApplicationStatus.values: _*) } else { status } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala index 6ef53449a6478..b87f97e52e220 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala @@ -31,7 +31,7 @@ class OneJobResource(uiRoot: UIRoot) { @PathParam("jobId") jobId: Int ): JobData = { uiRoot.withSparkUI(appId) { ui => - val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] = + val statusToJobs: Seq[(JobStatus, Seq[JobUIData])] = AllJobsResource.getStatusToJobs(ui) val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId} jobOpt.map { job => diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 2ed4f18400996..b0dc4dc270f16 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -21,7 +21,7 @@ import java.util.Date import scala.collection.Map import org.apache.spark.JobExecutionStatus -import org.apache.spark.status.api.StageStatus +import org.apache.spark.util.{SparkEnum, SparkEnumCompanion} class ApplicationInfo( val id: String, @@ -70,7 +70,7 @@ class JobData( val completionTime: Option[Date], val stageIds: Seq[Int], val jobGroup: Option[String], - val status: JobExecutionStatus, + val status: JobStatus, val numTasks: Int, val numActiveTasks: Int, val numCompletedTasks: Int, @@ -208,3 +208,89 @@ class AccumulableInfo ( } } + +private[spark] trait JerseyEnum[T <: SparkEnum] extends SparkEnumCompanion[T] { + def fromString(s: String): T = { + parseIgnoreCase(s).getOrElse { throw new IllegalArgumentException( + s"Illegal type=$s. Supported type values: ${values.map{_.toString}}")} + } +} + +sealed abstract class JobStatus extends SparkEnum +object JobStatus extends JerseyEnum[JobStatus] { + final val RUNNING = { + case object RUNNING extends JobStatus + RUNNING + } + final val SUCCEEDED = { + case object SUCCEEDED extends JobStatus + SUCCEEDED + } + final val FAILED = { + case object FAILED extends JobStatus + FAILED + } + final val UNKNOWN = { + case object UNKNOWN extends JobStatus + UNKNOWN + } + + val values = Seq( + RUNNING, + SUCCEEDED, + FAILED, + UNKNOWN + ) + + private[spark] def fromInternalStatus(s: JobExecutionStatus): JobStatus = { + JobStatus.parse(s.name()).get + } +} + +sealed abstract class StageStatus extends SparkEnum +object StageStatus extends JerseyEnum[StageStatus] { + final val Active = { + case object Active extends StageStatus + Active + } + + final val Complete = { + case object Complete extends StageStatus + Complete + } + + final val Failed = { + case object Failed extends StageStatus + Failed + } + + final val Pending = { + case object Pending extends StageStatus + Pending + } + + val values = Seq( + Active, + Complete, + Failed, + Pending + ) +} + +sealed abstract class ApplicationStatus extends SparkEnum +object ApplicationStatus extends JerseyEnum[SparkEnum] { + final val COMPLETED = { + case object COMPLETED extends ApplicationStatus + COMPLETED + } + + final val RUNNING = { + case object RUNNING extends ApplicationStatus + RUNNING + } + + val values = Seq( + COMPLETED, + RUNNING + ) +} diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 4a31fb5034666..af724875d20e0 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -18,30 +18,27 @@ package org.apache.spark.ui import java.net.URL -import javax.servlet.http.{HttpServletResponse, HttpServletRequest} +import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.JavaConversions._ import scala.xml.Node +import org.apache.sparktest.TestTags.ActiveTag import org.json4s._ import org.json4s.jackson.JsonMethods -import org.openqa.selenium.htmlunit.HtmlUnitDriver import org.openqa.selenium.{By, WebDriver} +import org.openqa.selenium.htmlunit.HtmlUnitDriver import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.selenium.WebBrowser import org.scalatest.time.SpanSugar._ - -import org.apache.spark.LocalSparkContext._ import org.apache.spark._ +import org.apache.spark.LocalSparkContext._ import org.apache.spark.api.java.StorageLevels import org.apache.spark.deploy.history.HistoryServerSuite import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.status.api.StageStatus -import org.apache.spark.status.api.v1.CustomObjectMapper -import org.apache.sparktest.TestTags.ActiveTag - +import org.apache.spark.status.api.v1.{CustomObjectMapper, StageStatus} /** * Selenium tests for the Spark Web UI. @@ -134,7 +131,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } val stageJson = getJson(sc.ui.get, "stages") stageJson.children.length should be (1) - (stageJson \ "status").extract[String] should be (StageStatus.Failed.name()) + (stageJson \ "status").extract[String] should be (StageStatus.Failed.toString) // Regression test for SPARK-2105 class NotSerializable @@ -266,7 +263,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before JInt(attemptId) <- stage \ "attemptId" } { val exp = if (attemptId == 0 && stageId == 1) StageStatus.Failed else StageStatus.Complete - status should be (exp.name()) + status should be (exp.toString) } for { @@ -275,7 +272,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } { val exp = if (attemptId == 0 && stageId == 1) StageStatus.Failed else StageStatus.Complete val stageJson = getJson(sc.ui.get, s"stages/$stageId/$attemptId") - (stageJson \ "status").extract[String] should be (exp.name()) + (stageJson \ "status").extract[String] should be (exp.toString) } } }