@@ -119,13 +118,13 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val serializationQuantiles =
"Result serialization time" +: Distribution(serializationTimes).
- get.getQuantiles().map(ms => parent.formatDuration(ms.toLong))
+ get.getQuantiles().map(ms => UIUtils.formatDuration(ms.toLong))
val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles()
- .map(ms => parent.formatDuration(ms.toLong))
+ .map(ms => UIUtils.formatDuration(ms.toLong))
val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
if (info.gettingResultTime > 0) {
@@ -136,7 +135,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val gettingResultQuantiles = "Time spent fetching task results" +:
Distribution(gettingResultTimes).get.getQuantiles().map { millis =>
- parent.formatDuration(millis.toLong)
+ UIUtils.formatDuration(millis.toLong)
}
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
@@ -153,7 +152,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val schedulerDelayQuantiles = "Scheduler delay" +:
Distribution(schedulerDelays).get.getQuantiles().map { millis =>
- parent.formatDuration(millis.toLong)
+ UIUtils.formatDuration(millis.toLong)
}
def getQuantileCols(data: Seq[Double]) =
@@ -204,8 +203,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
Aggregated Metrics by Executor
++ executorTable.toNodeSeq ++
Tasks
++ taskTable
- UIUtils.headerSparkPage(
- content, basePath, appName, "Details for Stage %d".format(stageId), Stages)
+ UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId),
+ parent.headerTabs, parent)
}
}
@@ -217,8 +216,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
taskData match { case TaskUIData(info, metrics, exception) =>
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(_.executorRunTime).getOrElse(1L)
- val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
- else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+ val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
+ else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
@@ -233,8 +232,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
- val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms =>
- if (ms == 0) "" else parent.formatDuration(ms)
+ val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
+ if (ms == 0) "" else UIUtils.formatDuration(ms)
}.getOrElse("")
val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
@@ -252,15 +251,15 @@ private[ui] class StagePage(parent: JobProgressUI) {
{info.status} |
{info.taskLocality} |
{info.host} |
- {WebUI.formatDate(new Date(info.launchTime))} |
+ {UIUtils.formatDate(new Date(info.launchTime))} |
{formatDuration}
|
- {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
+ {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
|
- {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
+ {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
|
{if (shuffleRead) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index ac61568af52d2..d918feafd97d0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -23,13 +23,13 @@ import scala.collection.mutable.HashMap
import scala.xml.Node
import org.apache.spark.scheduler.{StageInfo, TaskInfo}
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished stages */
-private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
+private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressTab) {
private val basePath = parent.basePath
- private lazy val listener = parent.listener
+ private lazy val listener = parent.jobProgressListener
private lazy val isFairScheduler = parent.isFairScheduler
def toNodeSeq: Seq[Node] = {
@@ -81,14 +81,14 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
val description = listener.stageIdToDescription.get(s.stageId)
.map(d => {d} {nameLink} ).getOrElse(nameLink)
val submissionTime = s.submissionTime match {
- case Some(t) => WebUI.formatDate(new Date(t))
+ case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
}
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
val duration = s.submissionTime.map { t =>
if (finishTime > t) finishTime - t else System.currentTimeMillis - t
}
- val formattedDuration = duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")
+ val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val startedTasks =
listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size
val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0)
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala
similarity index 78%
rename from core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
rename to core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala
index a7b24ff695214..ac83f71ed31de 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala
@@ -17,38 +17,29 @@
package org.apache.spark.ui.storage
-import javax.servlet.http.HttpServletRequest
-
import scala.collection.mutable
-import org.eclipse.jetty.servlet.ServletContextHandler
-
import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
import org.apache.spark.scheduler._
import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
/** Web UI showing storage status of all RDD's in the given SparkContext. */
-private[ui] class BlockManagerUI(parent: SparkUI) {
+private[ui] class BlockManagerTab(parent: SparkUI) extends UITab("storage") {
val appName = parent.appName
val basePath = parent.basePath
- private val indexPage = new IndexPage(this)
- private val rddPage = new RDDPage(this)
- private var _listener: Option[BlockManagerListener] = None
-
- lazy val listener = _listener.get
-
def start() {
- _listener = Some(new BlockManagerListener(parent.storageStatusListener))
+ listener = Some(new BlockManagerListener(parent.storageStatusListener))
+ attachPage(new IndexPage(this))
+ attachPage(new RddPage(this))
+ }
+
+ def blockManagerListener: BlockManagerListener = {
+ assert(listener.isDefined, "BlockManagerTab has not started yet!")
+ listener.get.asInstanceOf[BlockManagerListener]
}
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/storage/rdd",
- (request: HttpServletRequest) => rddPage.render(request), parent.securityManager, basePath),
- createServletHandler("/storage",
- (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
- )
+ def headerTabs: Seq[UITab] = parent.getTabs
}
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
index b2732de51058a..96b08d07d48d2 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
@@ -22,20 +22,19 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.storage.RDDInfo
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils
/** Page showing list of RDD's currently stored in the cluster */
-private[ui] class IndexPage(parent: BlockManagerUI) {
+private[ui] class IndexPage(parent: BlockManagerTab) extends UIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
- private lazy val listener = parent.listener
+ private lazy val listener = parent.blockManagerListener
- def render(request: HttpServletRequest): Seq[Node] = {
+ override def render(request: HttpServletRequest): Seq[Node] = {
val rdds = listener.rddInfoList
val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
- UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage)
+ UIUtils.headerSparkPage(content, basePath, appName, "Storage ", parent.headerTabs, parent)
}
/** Header fields for the RDD table */
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 3f42eba4ece00..a65ba0a020bcd 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -22,22 +22,22 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils}
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils
/** Page showing storage details for a given RDD */
-private[ui] class RDDPage(parent: BlockManagerUI) {
+private[ui] class RddPage(parent: BlockManagerTab) extends UIPage("rdd") {
private val appName = parent.appName
private val basePath = parent.basePath
- private lazy val listener = parent.listener
+ private lazy val listener = parent.blockManagerListener
- def render(request: HttpServletRequest): Seq[Node] = {
+ override def render(request: HttpServletRequest): Seq[Node] = {
val rddId = request.getParameter("id").toInt
val storageStatusList = listener.storageStatusList
val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
// Rather than crashing, render an "RDD Not Found" page
- return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", Storage)
+ return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found",
+ parent.headerTabs, parent)
}
// Worker table
@@ -95,8 +95,8 @@ private[ui] class RDDPage(parent: BlockManagerUI) {
;
- UIUtils.headerSparkPage(
- content, basePath, appName, "RDD Storage Info for " + rddInfo.name, Storage)
+ UIUtils.headerSparkPage(content, basePath, appName, "RDD Storage Info for " + rddInfo.name,
+ parent.headerTabs, parent)
}
/** Header fields for the worker table */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index 51b11a29cbd98..4063ce3d7ca44 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -18,7 +18,6 @@
package org.apache.spark.streaming.ui
import scala.xml.Node
-import org.apache.spark.ui.Page
private[spark] object UIUtils {
|