From d859efc34c9a5f07bae7eca7b4ab72fa19fb7e29 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 15 Feb 2014 14:01:14 -0800 Subject: [PATCH] BlockManagerUI: Add JSON functionality --- .../apache/spark/storage/StorageUtils.scala | 4 +- .../scala/org/apache/spark/ui/SparkUI.scala | 59 +++++++++++++++++- .../apache/spark/ui/env/EnvironmentUI.scala | 21 +++---- .../apache/spark/ui/exec/ExecutorsUI.scala | 51 +++------------- .../spark/ui/storage/BlockManagerUI.scala | 17 ++++-- .../apache/spark/ui/storage/IndexPage.scala | 30 ++++++---- .../org/apache/spark/ui/storage/RDDPage.scala | 60 ++++++++++++------- 7 files changed, 140 insertions(+), 102 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 54be19c615ac8..5623290112edf 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -137,8 +137,8 @@ object StorageUtils { } /* Filters storage status by a given RDD id. */ - def filterStorageStatusByRDD(storageStatusList: Array[StorageStatus], rddId: Int) - : Array[StorageStatus] = { + def filterStorageStatusByRDD(storageStatusList: Seq[StorageStatus], rddId: Int) + : Seq[StorageStatus] = { storageStatusList.map { status => val newBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toMap[BlockId, BlockStatus] diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index fecc75e635ce7..080d39f06abcf 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -26,9 +26,17 @@ import org.apache.spark.ui.storage.BlockManagerUI import org.apache.spark.ui.jobs.JobProgressUI import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{FileLogger, Utils} -import org.apache.spark.scheduler.{SparkListenerEvent, SparkListener} +import org.apache.spark.scheduler._ import net.liftweb.json.JsonAST._ +import org.apache.spark.storage.StorageStatus +import scala.Some +import scala.Some +import org.apache.spark.scheduler.SparkListenerStorageStatusFetch +import scala.Some +import org.apache.spark.scheduler.SparkListenerJobEnd +import org.apache.spark.scheduler.SparkListenerStageSubmitted +import org.apache.spark.scheduler.SparkListenerJobStart /** Top level user interface for Spark */ private[spark] class SparkUI(sc: SparkContext) extends Logging { @@ -72,6 +80,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { // DAGScheduler() requires that the port of this server is known // This server must register all handlers, including JobProgressUI, before binding // JobProgressUI registers a listener with SparkContext, which requires sc to initialize + storage.start() jobs.start() env.start() exec.start() @@ -90,9 +99,57 @@ private[spark] object SparkUI { val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" } +/** A SparkListener for logging events, one file per job */ private[spark] class UISparkListener(name: String) extends SparkListener { protected val logger = new FileLogger(name) + protected def logEvent(event: SparkListenerEvent) = { logger.logLine(compactRender(event.toJson)) } + + override def onJobStart(jobStart: SparkListenerJobStart) = logger.start() + + override def onJobEnd(jobEnd: SparkListenerJobEnd) = logger.close() +} + +/** + * A SparkListener that fetches storage information from SparkEnv and logs it as an event. + * + * The frequency at which this occurs is by default every time a stage event is triggered. + * This needs not necessarily be the case; a stage can be arbitrarily long, so any failure + * in the middle of a stage causes the storage status for that stage to be lost. + */ +private[spark] class StorageStatusFetchSparkListener( + name: String, + sc: SparkContext) + extends UISparkListener(name) { + var storageStatusList: Seq[StorageStatus] = sc.getExecutorStorageStatus + + /** + * Fetch storage information from SparkEnv, which involves a query to the driver. This is + * expensive and should be invoked sparingly. + */ + def fetchStorageStatus() { + val storageStatus = sc.getExecutorStorageStatus + val event = new SparkListenerStorageStatusFetch(storageStatus) + onStorageStatusFetch(event) + } + + /** + * Update local state with fetch result, and log the appropriate event + */ + protected def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) { + storageStatusList = storageStatusFetch.storageStatusList + logEvent(storageStatusFetch) + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { + fetchStorageStatus() + logger.flush() + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { + fetchStorageStatus() + logger.flush() + } } \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala index 4a7d3e2ef1df7..bc9bccce15e26 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala @@ -32,7 +32,6 @@ import org.apache.spark.ui.{UISparkListener, UIUtils} import org.apache.spark.ui.Page.Environment private[spark] class EnvironmentUI(sc: SparkContext) { - private var _listener: Option[EnvironmentListener] = None def listener = _listener.get @@ -45,9 +44,6 @@ private[spark] class EnvironmentUI(sc: SparkContext) { ("/environment", (request: HttpServletRequest) => render(request)) ) - /** - * Render an HTML page that encodes environment information - */ def render(request: HttpServletRequest): Seq[Node] = { listener.loadEnvironment() val runtimeInformationTable = UIUtils.listingTable( @@ -76,7 +72,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) { private def classPathRow(data: (String, String)) = {data._1}{data._2} /** - * A SparkListener that logs information to be displayed on the Environment UI. + * A SparkListener that prepares and logs information to be displayed on the Environment UI */ private[spark] class EnvironmentListener extends UISparkListener("environment-ui") { var jvmInformation: Seq[(String, String)] = Seq() @@ -84,9 +80,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) { var systemProperties: Seq[(String, String)] = Seq() var classpathEntries: Seq[(String, String)] = Seq() - /** - * Gather JVM, spark, system and classpath properties - */ + /** Gather JVM, spark, system and classpath properties */ def loadEnvironment() = { val jvmInformation = Seq( ("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)), @@ -116,9 +110,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) { onLoadEnvironment(loadEnvironment) } - /** - * Prepare environment information for UI to render, and log the corresponding event - */ + /** Prepare environment information for UI to render, and log the corresponding event */ def onLoadEnvironment(loadEnvironment: SparkListenerLoadEnvironment) = { jvmInformation = loadEnvironment.jvmInformation sparkProperties = loadEnvironment.sparkProperties @@ -128,8 +120,9 @@ private[spark] class EnvironmentUI(sc: SparkContext) { logger.flush() } - override def onJobStart(jobStart: SparkListenerJobStart) = logger.start() - - override def onJobEnd(jobEnd: SparkListenerJobEnd) = logger.close() + override def onJobStart(jobStart: SparkListenerJobStart) = { + super.onJobStart(jobStart) + loadEnvironment() + } } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index a57b8b393679f..460be50f9dd66 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -25,17 +25,14 @@ import scala.xml.Node import org.eclipse.jetty.server.Handler import org.apache.spark.{Logging, SparkContext, ExceptionFailure} -import org.apache.spark.scheduler._ import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.Page.Executors -import org.apache.spark.ui.{UISparkListener, UIUtils} +import org.apache.spark.ui.{StorageStatusFetchSparkListener, UIUtils} import org.apache.spark.util.Utils import org.apache.spark.scheduler.SparkListenerTaskEnd import org.apache.spark.scheduler.SparkListenerTaskStart -import org.apache.spark.storage.StorageStatus private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging { - private var _listener: Option[ExecutorsListener] = None def listener = _listener.get @@ -48,9 +45,6 @@ private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging { ("/executors", (request: HttpServletRequest) => render(request)) ) - /** - * Render an HTML page that encodes executor information - */ def render(request: HttpServletRequest): Seq[Node] = { listener.fetchStorageStatus() val storageStatusList = listener.storageStatusList @@ -80,9 +74,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging { UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors) } - /** - * Header fields in the executors table - */ + /** Header fields for the executors table */ private def execHeader = Seq( "Executor ID", "Address", @@ -97,14 +89,11 @@ private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging { "Shuffle Read", "Shuffle Write") - /** - * Render an HTML table row representing an executor - */ + /** Render an HTML row representing an executor */ private def execRow(values: Map[String, String]): Seq[Node] = { val maximumMemory = values("Maximum Memory") val memoryUsed = values("Memory Used") val diskUsed = values("Disk Used") - {values("Executor ID")} {values("Address")} @@ -126,9 +115,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging { } - /** - * Represent an executor's info as a map given a storage status index - */ + /** Represent an executor's info as a map given a storage status index */ private def getExecInfo(statusId: Int): Map[String, String] = { val status = listener.storageStatusList(statusId) val execId = status.blockManagerId.executorId @@ -169,40 +156,16 @@ private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging { } /** - * A SparkListener that logs information to be displayed on the Executors UI + * A SparkListener that prepares and logs information to be displayed on the Executors UI */ - private[spark] class ExecutorsListener extends UISparkListener("executors-ui") { + private[spark] + class ExecutorsListener extends StorageStatusFetchSparkListener("executors-ui", sc) { val executorToTasksActive = mutable.HashMap[String, Int]() val executorToTasksComplete = mutable.HashMap[String, Int]() val executorToTasksFailed = mutable.HashMap[String, Int]() val executorToDuration = mutable.HashMap[String, Long]() val executorToShuffleRead = mutable.HashMap[String, Long]() val executorToShuffleWrite = mutable.HashMap[String, Long]() - var storageStatusList: Seq[StorageStatus] = sc.getExecutorStorageStatus - - def fetchStorageStatus() { - val event = new SparkListenerStorageStatusFetch(sc.getExecutorStorageStatus) - onStorageStatusFetch(event) - } - - def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) { - storageStatusList = storageStatusFetch.storageStatusList - logEvent(storageStatusFetch) - } - - override def onJobStart(jobStart: SparkListenerJobStart) = logger.start() - - override def onJobEnd(jobEnd: SparkListenerJobEnd) = logger.close() - - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { - fetchStorageStatus() - logger.flush() - } - - override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { - fetchStorageStatus() - logger.flush() - } override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = formatExecutorId(taskStart.taskInfo.executorId) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala index 39f422dd6b90f..05c17597e52e7 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala @@ -17,22 +17,29 @@ package org.apache.spark.ui.storage -import scala.concurrent.duration._ - import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.Handler import org.apache.spark.{Logging, SparkContext} import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.ui.StorageStatusFetchSparkListener /** Web UI showing storage status of all RDD's in the given SparkContext. */ private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging { - val indexPage = new IndexPage(this) - val rddPage = new RDDPage(this) + private var _listener: Option[StorageStatusFetchSparkListener] = None + private val indexPage = new IndexPage(this) + private val rddPage = new RDDPage(this) + + def listener = _listener.get + + def start() { + _listener = Some(new StorageStatusFetchSparkListener("block-manager-ui", sc)) + sc.addSparkListener(listener) + } def getHandlers = Seq[(String, Handler)]( ("/storage/rdd", (request: HttpServletRequest) => rddPage.render(request)), ("/storage", (request: HttpServletRequest) => indexPage.render(request)) ) -} +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index 109a7d4094c0a..8b99d9872502a 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -28,26 +28,30 @@ import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ private[spark] class IndexPage(parent: BlockManagerUI) { - val sc = parent.sc + private val sc = parent.sc def render(request: HttpServletRequest): Seq[Node] = { - val storageStatusList = sc.getExecutorStorageStatus - // Calculate macro-level statistics + parent.listener.fetchStorageStatus() + val storageStatusList = parent.listener.storageStatusList - val rddHeaders = Seq( - "RDD Name", - "Storage Level", - "Cached Partitions", - "Fraction Cached", - "Size in Memory", - "Size on Disk") + // Calculate macro-level statistics val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - val content = listingTable(rddHeaders, rddRow, rdds) + val content = listingTable(rddHeader, rddRow, rdds) - headerSparkPage(content, parent.sc, "Storage ", Storage) + headerSparkPage(content, sc, "Storage ", Storage) } - def rddRow(rdd: RDDInfo): Seq[Node] = { + /** Header fields for the RDD table */ + private def rddHeader = Seq( + "RDD Name", + "Storage Level", + "Cached Partitions", + "Fraction Cached", + "Size in Memory", + "Size on Disk") + + /** Render an HTML row representing an RDD */ + private def rddRow(rdd: RDDInfo): Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index b83cd54f3c39a..67f195a1e17ef 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -27,31 +27,29 @@ import org.apache.spark.ui.UIUtils._ import org.apache.spark.ui.Page._ import org.apache.spark.util.Utils - /** Page showing storage details for a given RDD */ private[spark] class RDDPage(parent: BlockManagerUI) { - val sc = parent.sc + private val sc = parent.sc def render(request: HttpServletRequest): Seq[Node] = { + parent.listener.fetchStorageStatus() + val storageStatusList = parent.listener.storageStatusList val id = request.getParameter("id").toInt - val storageStatusList = sc.getExecutorStorageStatus val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList, id) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head - val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage") + // Worker table val workers = filteredStorageStatusList.map((id, _)) - val workerTable = listingTable(workerHeaders, workerRow, workers) - - val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk", - "Executors") + val workerTable = listingTable(workerHeader, workerRow, workers) + // Block table val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray. sortWith(_._1.name < _._1.name) val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList) val blocks = blockStatuses.map { case(id, status) => (id, status, blockLocations.get(id).getOrElse(Seq("UNKNOWN"))) } - val blockTable = listingTable(blockHeaders, blockRow, blocks) + val blockTable = listingTable(blockHeader, blockRow, blocks) val content =
@@ -95,10 +93,38 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
; - headerSparkPage(content, parent.sc, "RDD Storage Info for " + rddInfo.name, Storage) + headerSparkPage(content, sc, "RDD Storage Info for " + rddInfo.name, Storage) + } + + /** Header fields for the worker table */ + private def workerHeader = Seq( + "Host", + "Memory Usage", + "Disk Usage") + + /** Header fields for the block table */ + private def blockHeader = Seq( + "Block Name", + "Storage Level", + "Size in Memory", + "Size on Disk", + "Executors") + + /** Render an HTML row representing a worker */ + private def workerRow(worker: (Int, StorageStatus)): Seq[Node] = { + val (rddId, status) = worker + + {status.blockManagerId.host + ":" + status.blockManagerId.port} + + {Utils.bytesToString(status.memUsedByRDD(rddId))} + ({Utils.bytesToString(status.memRemaining)} Remaining) + + {Utils.bytesToString(status.diskUsedByRDD(rddId))} + } - def blockRow(row: (BlockId, BlockStatus, Seq[String])): Seq[Node] = { + /** Render an HTML row representing a block */ + private def blockRow(row: (BlockId, BlockStatus, Seq[String])): Seq[Node] = { val (id, block, locations) = row {id} @@ -116,16 +142,4 @@ private[spark] class RDDPage(parent: BlockManagerUI) { } - - def workerRow(worker: (Int, StorageStatus)): Seq[Node] = { - val (rddId, status) = worker - - {status.blockManagerId.host + ":" + status.blockManagerId.port} - - {Utils.bytesToString(status.memUsedByRDD(rddId))} - ({Utils.bytesToString(status.memRemaining)} Remaining) - - {Utils.bytesToString(status.diskUsedByRDD(rddId))} - - } }