Skip to content

Commit

Permalink
handle multiple attempts per app
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed May 1, 2015
1 parent b87cd63 commit c9bae1c
Show file tree
Hide file tree
Showing 26 changed files with 779 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,26 @@ import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.ui.jobs.UIData.JobUIData

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class AllJobsResource(uiRoot: UIRoot) {
private[v1] class AllJobsResource(ui: SparkUI) {

@GET
def jobsList(
@PathParam("appId") appId: String,
@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = {
uiRoot.withSparkUI(appId) { ui =>
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
AllJobsResource.getStatusToJobs(ui)
val adjStatuses: JList[JobExecutionStatus] = {
if (statuses.isEmpty) {
Arrays.asList(JobExecutionStatus.values(): _*)
} else {
statuses
}
def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = {
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
AllJobsResource.getStatusToJobs(ui)
val adjStatuses: JList[JobExecutionStatus] = {
if (statuses.isEmpty) {
Arrays.asList(JobExecutionStatus.values(): _*)
} else {
statuses
}
val jobInfos = for {
(status, jobs) <- statusToJobs
job <- jobs if adjStatuses.contains(status)
} yield {
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
}
jobInfos.sortBy{- _.jobId}
}
val jobInfos = for {
(status, jobs) <- statusToJobs
job <- jobs if adjStatuses.contains(status)
} yield {
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
}
jobInfos.sortBy{- _.jobId}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,23 @@
*/
package org.apache.spark.status.api.v1

import javax.ws.rs.{GET, PathParam, Produces}
import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.MediaType

import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.storage.StorageListener

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class AllRDDResource(uiRoot: UIRoot) {
private[v1] class AllRDDResource(ui: SparkUI) {

@GET
def jobsList(@PathParam("appId") appId: String): Seq[RDDStorageInfo] = {
uiRoot.withSparkUI(appId) { ui =>
val storageStatusList = ui.storageListener.storageStatusList
val rddInfos = ui.storageListener.rddInfoList
rddInfos.map{rddInfo =>
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
includeDetails = false)
}

def rddList(): Seq[RDDStorageInfo] = {
val storageStatusList = ui.storageListener.storageStatusList
val rddInfos = ui.storageListener.rddInfoList
rddInfos.map{rddInfo =>
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
includeDetails = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,27 @@ import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
import org.apache.spark.util.Distribution

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class AllStagesResource(uiRoot: UIRoot) {
private[v1] class AllStagesResource(ui: SparkUI) {

@GET
def stageList(
@PathParam("appId") appId: String,
@QueryParam("status") statuses: JList[StageStatus]
): Seq[StageData] = {
uiRoot.withSparkUI(appId) { ui =>
val listener = ui.jobProgressListener
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
val adjStatuses = {
if (statuses.isEmpty()) {
Arrays.asList(StageStatus.values(): _*)
} else {
statuses
}
def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = {
val listener = ui.jobProgressListener
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
val adjStatuses = {
if (statuses.isEmpty()) {
Arrays.asList(StageStatus.values(): _*)
} else {
statuses
}
for {
(status, stageList) <- stageAndStatus
stageInfo: StageInfo <- stageList if adjStatuses.contains(status)
stageUiData: StageUIData <- listener.synchronized {
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId))
}
} yield {
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false)
}
for {
(status, stageList) <- stageAndStatus
stageInfo: StageInfo <- stageList if adjStatuses.contains(status)
stageUiData: StageUIData <- listener.synchronized {
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId))
}
} yield {
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ package org.apache.spark.status.api.v1
import javax.ws.rs.{GET, PathParam, Produces}
import javax.ws.rs.core.MediaType

import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.exec.ExecutorsPage

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class ExecutorListResource(uiRoot: UIRoot) {
private[v1] class ExecutorListResource(ui: SparkUI) {

@GET
def jobsList(@PathParam("appId") appId: String): Seq[ExecutorSummary] = {
uiRoot.withSparkUI(appId) { ui =>
val listener = ui.executorsListener
val storageStatusList = listener.storageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
}
def executorList(): Seq[ExecutorSummary] = {
val listener = ui.executorsListener
val storageStatusList = listener.storageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,39 +51,117 @@ private[v1] class JsonRootResource extends UIRootFromServletContext {
new OneApplicationResource(uiRoot)
}

@Path("applications/{appId}/{attemptId}/jobs")
def getJobs(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): AllJobsResource = {
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
new AllJobsResource(ui)
}
}

@Path("applications/{appId}/jobs")
def getJobs(): AllJobsResource = {
new AllJobsResource(uiRoot)
def getJobs(@PathParam("appId") appId: String): AllJobsResource = {
uiRoot.withSparkUI(appId, None) { ui =>
new AllJobsResource(ui)
}
}

@Path("applications/{appId}/jobs/{jobId: \\d+}")
def getJob(): OneJobResource = {
new OneJobResource(uiRoot)
def getJob(@PathParam("appId") appId: String): OneJobResource = {
uiRoot.withSparkUI(appId, None) { ui =>
new OneJobResource(ui)
}
}

@Path("applications/{appId}/{attemptId}/jobs/{jobId: \\d+}")
def getJob(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): OneJobResource = {
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
new OneJobResource(ui)
}
}

@Path("applications/{appId}/executors")
def getExecutors(): ExecutorListResource = {
new ExecutorListResource(uiRoot)
def getExecutors(@PathParam("appId") appId: String): ExecutorListResource = {
uiRoot.withSparkUI(appId, None) { ui =>
new ExecutorListResource(ui)
}
}

@Path("applications/{appId}/{attemptId}/executors")
def getExecutors(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): ExecutorListResource = {
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
new ExecutorListResource(ui)
}
}


@Path("applications/{appId}/stages")
def getStages(): AllStagesResource= {
new AllStagesResource(uiRoot)
def getStages(@PathParam("appId") appId: String): AllStagesResource= {
uiRoot.withSparkUI(appId, None) { ui =>
new AllStagesResource(ui)
}
}

@Path("applications/{appId}/{attemptId}/stages")
def getStages(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): AllStagesResource= {
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
new AllStagesResource(ui)
}
}

@Path("applications/{appId}/stages/{stageId: \\d+}")
def getStage(): OneStageResource= {
new OneStageResource(uiRoot)
def getStage(@PathParam("appId") appId: String): OneStageResource= {
uiRoot.withSparkUI(appId, None) { ui =>
new OneStageResource(ui)
}
}

@Path("applications/{appId}/{attemptId}/stages/{stageId: \\d+}")
def getStage(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): OneStageResource = {
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
new OneStageResource(ui)
}
}

@Path("applications/{appId}/storage/rdd")
def getRdds(): AllRDDResource = {
new AllRDDResource(uiRoot)
def getRdds(@PathParam("appId") appId: String): AllRDDResource = {
uiRoot.withSparkUI(appId, None) { ui =>
new AllRDDResource(ui)
}
}

@Path("applications/{appId}/{attemptId}/storage/rdd")
def getRdds(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): AllRDDResource = {
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
new AllRDDResource(ui)
}
}

@Path("applications/{appId}/storage/rdd/{rddId: \\d+}")
def getRdd(): OneRDDResource = {
new OneRDDResource(uiRoot)
def getRdd(@PathParam("appId") appId: String): OneRDDResource = {
uiRoot.withSparkUI(appId, None) { ui =>
new OneRDDResource(ui)
}
}

@Path("applications/{appId}/{attemptId}/storage/rdd/{rddId: \\d+}")
def getRdd(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): OneRDDResource = {
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
new OneRDDResource(ui)
}
}

}
Expand Down Expand Up @@ -119,8 +197,9 @@ private[spark] trait UIRoot {
* Get the spark UI with the given appID, and apply a function
* to it. If there is no such app, throw an appropriate exception
*/
def withSparkUI[T](appId: String)(f: SparkUI => T): T = {
getSparkUI(appId) match {
def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = {
val appKey = attemptId.map(appId + "/" + _).getOrElse(appId)
getSparkUI(appKey) match {
case Some(ui) =>
f(ui)
case None => throw new NotFoundException("no such app: " + appId)
Expand All @@ -130,10 +209,13 @@ private[spark] trait UIRoot {
}

private[v1] object UIRootFromServletContext {

private val attribute = getClass.getCanonicalName

def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot): Unit = {
contextHandler.setAttribute(attribute, uiRoot)
}

def getUiRoot(context: ServletContext): UIRoot = {
context.getAttribute(attribute).asInstanceOf[UIRoot]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,21 @@ import javax.ws.rs.{PathParam, GET, Produces}
import javax.ws.rs.core.MediaType

import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData.JobUIData

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class OneJobResource(uiRoot: UIRoot) {
private[v1] class OneJobResource(ui: SparkUI) {

@GET
def jobsList(@PathParam("appId") appId: String, @PathParam("jobId") jobId: Int): JobData = {
uiRoot.withSparkUI(appId) { ui =>
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
AllJobsResource.getStatusToJobs(ui)
val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId}
jobOpt.map { job =>
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
}.getOrElse {
throw new NotFoundException("unknown job: " + jobId)
}
def oneJob(@PathParam("jobId") jobId: Int): JobData = {
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
AllJobsResource.getStatusToJobs(ui)
val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId}
jobOpt.map { job =>
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
}.getOrElse {
throw new NotFoundException("unknown job: " + jobId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ package org.apache.spark.status.api.v1
import javax.ws.rs.{PathParam, GET, Produces}
import javax.ws.rs.core.MediaType

import org.apache.spark.ui.SparkUI

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class OneRDDResource(uiRoot: UIRoot) {
private[v1] class OneRDDResource(ui: SparkUI) {

@GET
def rddData(
@PathParam("appId") appId: String,
@PathParam("rddId") rddId: Int): RDDStorageInfo = {
uiRoot.withSparkUI(appId) { ui =>
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
throw new NotFoundException(s"no rdd found w/ id $rddId")
)
}
}
@GET
def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
throw new NotFoundException(s"no rdd found w/ id $rddId")
)
}

}
Loading

0 comments on commit c9bae1c

Please sign in to comment.