Skip to content

Commit

Permalink
style
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Mar 16, 2015
1 parent 18a8c45 commit b86e2b0
Show file tree
Hide file tree
Showing 19 changed files with 73 additions and 112 deletions.
20 changes: 2 additions & 18 deletions core/src/main/java/org/apache/spark/JobExecutionStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,15 @@

package org.apache.spark;

import com.google.common.base.Joiner;

import java.util.Arrays;
import org.apache.spark.status.api.EnumUtil;

public enum JobExecutionStatus {
RUNNING,
SUCCEEDED,
FAILED,
UNKNOWN;


private static String VALID_VALUES = Joiner.on(", ").join(
Arrays.asList(JobExecutionStatus.values()));

public static JobExecutionStatus fromString(String str) {
if (str == null) {
return null;
}
try {
JobExecutionStatus res = valueOf(str.toUpperCase());
return res;
} catch (IllegalArgumentException iae) {
throw new IllegalArgumentException(
String.format("Illegal type='%s'. Supported type values: %s",
str, VALID_VALUES));
}
return EnumUtil.parseIgnoreCase(JobExecutionStatus.class, str);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@

package org.apache.spark.status.api;

import com.google.common.base.Joiner;

import java.util.Arrays;

public enum ApplicationStatus {
COMPLETED,
RUNNING;
COMPLETED,
RUNNING;

public static ApplicationStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(ApplicationStatus.class, str);
}
public static ApplicationStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(ApplicationStatus.class, str);
}

}
26 changes: 13 additions & 13 deletions core/src/main/java/org/apache/spark/status/api/EnumUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
import java.util.Arrays;

public class EnumUtil {
public static <E extends Enum<E>> E parseIgnoreCase(Class<E> clz, String str) {
E[] constants = clz.getEnumConstants();
if (str == null) {
return null;
}
for (E e: constants) {
if (e.name().equalsIgnoreCase(str))
return e;
}
throw new IllegalArgumentException(
String.format("Illegal type='%s'. Supported type values: %s",
str, Joiner.on(", ").join(
Arrays.asList(constants))));
public static <E extends Enum<E>> E parseIgnoreCase(Class<E> clz, String str) {
E[] constants = clz.getEnumConstants();
if (str == null) {
return null;
}
for (E e : constants) {
if (e.name().equalsIgnoreCase(str))
return e;
}
throw new IllegalArgumentException(
String.format("Illegal type='%s'. Supported type values: %s",
str, Joiner.on(", ").join(
Arrays.asList(constants))));
}
}
14 changes: 7 additions & 7 deletions core/src/main/java/org/apache/spark/status/api/StageStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package org.apache.spark.status.api;

public enum StageStatus {
Active,
Complete,
Failed,
Pending;
Active,
Complete,
Failed,
Pending;

public static StageStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(StageStatus.class, str);
}
public static StageStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(StageStatus.class, str);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class HistoryServer(
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
val parts = Option(req.getPathInfo()).getOrElse("").split("/")
if (parts.length < 2) {
logError("bad path info!")
res.sendError(HttpServletResponse.SC_BAD_REQUEST,
s"Unexpected path info in request (URI = ${req.getRequestURI()}")
return
Expand Down Expand Up @@ -163,7 +162,7 @@ class HistoryServer(
def getApplicationList(refresh: Boolean) = provider.getListing(refresh)

def getApplicationInfoList: Seq[ApplicationInfo] = {
getApplicationList(true).map{ApplicationsListResource.appHistoryInfoToPublicAppInfo}.toSeq
getApplicationList(true).map { ApplicationsListResource.appHistoryInfoToPublicAppInfo }.toSeq
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ class MasterWebUI(val master: Master, requestedPort: Int)
val state = masterPage.getMasterState
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val completedApps = state.completedApps.sortBy(_.endTime).reverse
activeApps.map{ApplicationsListResource.convertApplicationInfo(_, false)} ++
completedApps.map{ApplicationsListResource.convertApplicationInfo(_, true)}
activeApps.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++
completedApps.map { ApplicationsListResource.convertApplicationInfo(_, true) }
}

def getSparkUI(appId: String): Option[SparkUI] = {
val state = masterPage.getMasterState
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val completedApps = state.completedApps.sortBy(_.endTime).reverse
(activeApps ++ completedApps).find{_.id == appId}.flatMap{
(activeApps ++ completedApps).find { _.id == appId }.flatMap {
master.rebuildSparkUI
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class AllJobsResource(uiRoot: UIRoot) {
@PathParam("appId") appId: String,
@QueryParam("status") statuses: java.util.List[JobExecutionStatus]
): Seq[JobData] = {
uiRoot.withSparkUI(appId){ui =>
val statusToJobs = ui.jobProgressListener.synchronized{
uiRoot.withSparkUI(appId) { ui =>
val statusToJobs = ui.jobProgressListener.synchronized {
Seq(
JobExecutionStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq,
JobExecutionStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq,
Expand Down Expand Up @@ -70,8 +70,8 @@ object AllJobsResource {
val lastStageData = lastStageInfo.flatMap { s =>
listener.stageIdToData.get((s.stageId, s.attemptId))
}
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description)
val lastStageName = lastStageInfo.map { _.name }.getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap { _.description }
JobData(
jobId = job.jobId,
name = lastStageName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object AllRDDResource {
listener: StorageListener,
includeDetails: Boolean): Option[RDDStorageInfo] = {
val storageStatusList = listener.storageStatusList
listener.rddInfoList.find(_.id == rddId).map{rddInfo =>
listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
}
}
Expand All @@ -61,37 +61,37 @@ object AllRDDResource {
storageStatusList: Seq[StorageStatus],
includeDetails: Boolean
): RDDStorageInfo = {
val workers = storageStatusList.map((rddId, _))
val workers = storageStatusList.map { (rddId, _) }
val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
val blocks = storageStatusList
.flatMap(_.rddBlocksById(rddId))
.sortWith(_._1.name < _._1.name)
.flatMap { _.rddBlocksById(rddId) }
.sortWith { _._1.name < _._1.name }
.map { case (blockId, status) =>
(blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
}


val dataDistribution = if (includeDetails) {
Some(storageStatusList.map{status =>
Some(storageStatusList.map { status =>
RDDDataDistribution(
address = status.blockManagerId.hostPort,
memoryUsed = status.memUsedByRdd(rddId),
memoryRemaining = status.memRemaining,
diskUsed = status.diskUsedByRdd(rddId)
)})
) } )
} else {
None
}
val partitions = if (includeDetails) {
Some(blocks.map{ case(id, block, locations) =>
Some(blocks.map { case(id, block, locations) =>
RDDPartitionInfo(
blockName = id.name,
storageLevel = block.storageLevel.description,
memoryUsed = block.memSize,
diskUsed = block.diskSize,
executors = locations
)
})
} )
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ object AllStagesResource {
): StageData = {

val taskData = if(includeDetails) {
Some(stageUiData.taskData.map{case(k,v) => k -> convertTaskData(v)})
Some(stageUiData.taskData.map { case(k,v) => k -> convertTaskData(v) } )
} else {
None
}
val executorSummary = if(includeDetails) {
Some(stageUiData.executorSummary.map{case(k,summary) => k ->
ExecutorStageSummary(
Some(stageUiData.executorSummary.map { case(k,summary) =>
k -> ExecutorStageSummary(
taskTime = summary.taskTime,
failedTasks = summary.failedTasks,
succeededTasks = summary.succeededTasks,
Expand Down Expand Up @@ -114,7 +114,7 @@ object AllStagesResource {

def stagesAndStatus(ui: SparkUI): Seq[(StageStatus, Seq[StageInfo])] = {
val listener = ui.stagesTab.listener
listener.synchronized{
listener.synchronized {
Seq(
StageStatus.Active -> listener.activeStages.values.toSeq,
StageStatus.Complete -> listener.completedStages.reverse.toSeq,
Expand All @@ -136,7 +136,7 @@ object AllStagesResource {
taskLocality = uiData.taskInfo.taskLocality.toString(),
speculative = uiData.taskInfo.speculative,
errorMessage = uiData.errorMessage,
taskMetrics = uiData.taskMetrics.map{convertUiTaskMetrics}
taskMetrics = uiData.taskMetrics.map { convertUiTaskMetrics }
)
}

Expand All @@ -149,10 +149,10 @@ object AllStagesResource {
resultSerializationTime = internal.resultSerializationTime,
memoryBytesSpilled = internal.memoryBytesSpilled,
diskBytesSpilled = internal.diskBytesSpilled,
inputMetrics = internal.inputMetrics.map{convertInputMetrics},
outputMetrics = Option(internal.outputMetrics).flatten.map{convertOutputMetrics},
shuffleReadMetrics = internal.shuffleReadMetrics.map{convertShuffleReadMetrics},
shuffleWriteMetrics = internal.shuffleWriteMetrics.map{convertShuffleWriteMetrics}
inputMetrics = internal.inputMetrics.map { convertInputMetrics },
outputMetrics = Option(internal.outputMetrics).flatten.map { convertOutputMetrics },
shuffleReadMetrics = internal.shuffleReadMetrics.map { convertShuffleReadMetrics },
shuffleWriteMetrics = internal.shuffleWriteMetrics.map { convertShuffleWriteMetrics }
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ApplicationListResource(uiRoot: UIRoot) {
}
val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED)
val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING)
allApps.filter{app =>
allApps.filter { app =>
val statusOk = (app.completed && includeCompleted) ||
(!app.completed && includeRunning)
val dateOk = app.startTime.getTime >= minDate.timestamp &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class OneApplicationResource(uiRoot: UIRoot) {

@GET
def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
val apps = uiRoot.getApplicationInfoList.find{_.id == appId}
val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class OneRDDResource(uiRoot: UIRoot) {
@PathParam("rddId") rddId: Int
): RDDStorageInfo = {
uiRoot.withSparkUI(appId) { ui =>
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse{
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
throw new IllegalArgumentException("no rdd found w/ id " + rddId)
}
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ class OneStageResource(uiRoot: UIRoot) {
uiRoot.withSparkUI(appId) { ui =>
val listener = ui.stagesTab.listener
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
val oneStage = stageAndStatus.flatMap{case (status, stages) =>
val matched = stages.find{_.stageId == stageId}
matched.map{status -> _}
val oneStage = stageAndStatus.flatMap { case (status, stages) =>
val matched = stages.find { _.stageId == stageId }
matched.map { status -> _ }
}.headOption
oneStage match {
case Some((status,stageInfo)) =>
val stageUiData = listener.synchronized{
val stageUiData = listener.synchronized {
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
getOrElse{ throw new SparkException("failed to get full stage data for stage: " +
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
stageInfo.stageId + ":" + stageInfo.attemptId)
}
)
}
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
includeDetails = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.sun.jersey.spi.container.{ContainerRequest, ContainerRequestFilter}

class SecurityFilter extends ContainerRequestFilter with UIRootFromServletContext {
def filter(req: ContainerRequest): ContainerRequest = {
val user = Option(req.getUserPrincipal).map{_.getName}.orNull
val user = Option(req.getUserPrincipal).map { _.getName }.orNull
if (uiRoot.securityManager.checkUIViewPermissions(user)) {
req
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@ import scala.util.Try

private[api] class SimpleDateParam(val originalValue: String) {
val timestamp: Long = {
SimpleDateParam.formats.collectFirst{
case fmt if Try{fmt.parse(originalValue)}.isSuccess =>
SimpleDateParam.formats.collectFirst {
case fmt if Try{ fmt.parse(originalValue) }.isSuccess =>
fmt.parse(originalValue).getTime()
}.getOrElse{
}.getOrElse(
throw new WebApplicationException(
Response
.status(Status.BAD_REQUEST)
.entity("Couldn't parse date: " + originalValue)
.build()
)
}
)
}
}

private[api] object SimpleDateParam {
val formats = Seq(
"yyyy-MM-dd'T'HH:mm:ss.SSSz",
"yyyy-MM-dd"
).map{new SimpleDateFormat(_)}
).map { new SimpleDateFormat(_) }
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.util.collection.OpenHashSet

import scala.collection.mutable.HashMap

object UIData {
private[spark] object UIData {

class ExecutorSummary {
var taskTime : Long = 0
Expand Down
Loading

0 comments on commit b86e2b0

Please sign in to comment.