Skip to content

Commit

Permalink
BlockManagerUI: Add JSON functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Feb 15, 2014
1 parent c4cd480 commit d859efc
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ object StorageUtils {
}

/* Filters storage status by a given RDD id. */
def filterStorageStatusByRDD(storageStatusList: Array[StorageStatus], rddId: Int)
: Array[StorageStatus] = {
def filterStorageStatusByRDD(storageStatusList: Seq[StorageStatus], rddId: Int)
: Seq[StorageStatus] = {

storageStatusList.map { status =>
val newBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toMap[BlockId, BlockStatus]
Expand Down
59 changes: 58 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ import org.apache.spark.ui.storage.BlockManagerUI
import org.apache.spark.ui.jobs.JobProgressUI
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{FileLogger, Utils}
import org.apache.spark.scheduler.{SparkListenerEvent, SparkListener}
import org.apache.spark.scheduler._

import net.liftweb.json.JsonAST._
import org.apache.spark.storage.StorageStatus
import scala.Some
import scala.Some
import org.apache.spark.scheduler.SparkListenerStorageStatusFetch
import scala.Some
import org.apache.spark.scheduler.SparkListenerJobEnd
import org.apache.spark.scheduler.SparkListenerStageSubmitted
import org.apache.spark.scheduler.SparkListenerJobStart

/** Top level user interface for Spark */
private[spark] class SparkUI(sc: SparkContext) extends Logging {
Expand Down Expand Up @@ -72,6 +80,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
// DAGScheduler() requires that the port of this server is known
// This server must register all handlers, including JobProgressUI, before binding
// JobProgressUI registers a listener with SparkContext, which requires sc to initialize
storage.start()
jobs.start()
env.start()
exec.start()
Expand All @@ -90,9 +99,57 @@ private[spark] object SparkUI {
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
}

/** A SparkListener for logging events, one file per job */
private[spark] class UISparkListener(name: String) extends SparkListener {
protected val logger = new FileLogger(name)

protected def logEvent(event: SparkListenerEvent) = {
logger.logLine(compactRender(event.toJson))
}

override def onJobStart(jobStart: SparkListenerJobStart) = logger.start()

override def onJobEnd(jobEnd: SparkListenerJobEnd) = logger.close()
}

/**
* A SparkListener that fetches storage information from SparkEnv and logs it as an event.
*
* The frequency at which this occurs is by default every time a stage event is triggered.
* This needs not necessarily be the case; a stage can be arbitrarily long, so any failure
* in the middle of a stage causes the storage status for that stage to be lost.
*/
private[spark] class StorageStatusFetchSparkListener(
name: String,
sc: SparkContext)
extends UISparkListener(name) {
var storageStatusList: Seq[StorageStatus] = sc.getExecutorStorageStatus

/**
* Fetch storage information from SparkEnv, which involves a query to the driver. This is
* expensive and should be invoked sparingly.
*/
def fetchStorageStatus() {
val storageStatus = sc.getExecutorStorageStatus
val event = new SparkListenerStorageStatusFetch(storageStatus)
onStorageStatusFetch(event)
}

/**
* Update local state with fetch result, and log the appropriate event
*/
protected def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) {
storageStatusList = storageStatusFetch.storageStatusList
logEvent(storageStatusFetch)
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
fetchStorageStatus()
logger.flush()
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
fetchStorageStatus()
logger.flush()
}
}
21 changes: 7 additions & 14 deletions core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.ui.{UISparkListener, UIUtils}
import org.apache.spark.ui.Page.Environment

private[spark] class EnvironmentUI(sc: SparkContext) {

private var _listener: Option[EnvironmentListener] = None
def listener = _listener.get

Expand All @@ -45,9 +44,6 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
("/environment", (request: HttpServletRequest) => render(request))
)

/**
* Render an HTML page that encodes environment information
*/
def render(request: HttpServletRequest): Seq[Node] = {
listener.loadEnvironment()
val runtimeInformationTable = UIUtils.listingTable(
Expand Down Expand Up @@ -76,17 +72,15 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>

/**
* A SparkListener that logs information to be displayed on the Environment UI.
* A SparkListener that prepares and logs information to be displayed on the Environment UI
*/
private[spark] class EnvironmentListener extends UISparkListener("environment-ui") {
var jvmInformation: Seq[(String, String)] = Seq()
var sparkProperties: Seq[(String, String)] = Seq()
var systemProperties: Seq[(String, String)] = Seq()
var classpathEntries: Seq[(String, String)] = Seq()

/**
* Gather JVM, spark, system and classpath properties
*/
/** Gather JVM, spark, system and classpath properties */
def loadEnvironment() = {
val jvmInformation = Seq(
("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
Expand Down Expand Up @@ -116,9 +110,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
onLoadEnvironment(loadEnvironment)
}

/**
* Prepare environment information for UI to render, and log the corresponding event
*/
/** Prepare environment information for UI to render, and log the corresponding event */
def onLoadEnvironment(loadEnvironment: SparkListenerLoadEnvironment) = {
jvmInformation = loadEnvironment.jvmInformation
sparkProperties = loadEnvironment.sparkProperties
Expand All @@ -128,8 +120,9 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
logger.flush()
}

override def onJobStart(jobStart: SparkListenerJobStart) = logger.start()

override def onJobEnd(jobEnd: SparkListenerJobEnd) = logger.close()
override def onJobStart(jobStart: SparkListenerJobStart) = {
super.onJobStart(jobStart)
loadEnvironment()
}
}
}
51 changes: 7 additions & 44 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,14 @@ import scala.xml.Node
import org.eclipse.jetty.server.Handler

import org.apache.spark.{Logging, SparkContext, ExceptionFailure}
import org.apache.spark.scheduler._
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.Page.Executors
import org.apache.spark.ui.{UISparkListener, UIUtils}
import org.apache.spark.ui.{StorageStatusFetchSparkListener, UIUtils}
import org.apache.spark.util.Utils
import org.apache.spark.scheduler.SparkListenerTaskEnd
import org.apache.spark.scheduler.SparkListenerTaskStart
import org.apache.spark.storage.StorageStatus

private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging {

private var _listener: Option[ExecutorsListener] = None
def listener = _listener.get

Expand All @@ -48,9 +45,6 @@ private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging {
("/executors", (request: HttpServletRequest) => render(request))
)

/**
* Render an HTML page that encodes executor information
*/
def render(request: HttpServletRequest): Seq[Node] = {
listener.fetchStorageStatus()
val storageStatusList = listener.storageStatusList
Expand Down Expand Up @@ -80,9 +74,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging {
UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors)
}

/**
* Header fields in the executors table
*/
/** Header fields for the executors table */
private def execHeader = Seq(
"Executor ID",
"Address",
Expand All @@ -97,14 +89,11 @@ private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging {
"Shuffle Read",
"Shuffle Write")

/**
* Render an HTML table row representing an executor
*/
/** Render an HTML row representing an executor */
private def execRow(values: Map[String, String]): Seq[Node] = {
val maximumMemory = values("Maximum Memory")
val memoryUsed = values("Memory Used")
val diskUsed = values("Disk Used")

<tr>
<td>{values("Executor ID")}</td>
<td>{values("Address")}</td>
Expand All @@ -126,9 +115,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging {
</tr>
}

/**
* Represent an executor's info as a map given a storage status index
*/
/** Represent an executor's info as a map given a storage status index */
private def getExecInfo(statusId: Int): Map[String, String] = {
val status = listener.storageStatusList(statusId)
val execId = status.blockManagerId.executorId
Expand Down Expand Up @@ -169,40 +156,16 @@ private[spark] class ExecutorsUI(val sc: SparkContext) extends Logging {
}

/**
* A SparkListener that logs information to be displayed on the Executors UI
* A SparkListener that prepares and logs information to be displayed on the Executors UI
*/
private[spark] class ExecutorsListener extends UISparkListener("executors-ui") {
private[spark]
class ExecutorsListener extends StorageStatusFetchSparkListener("executors-ui", sc) {
val executorToTasksActive = mutable.HashMap[String, Int]()
val executorToTasksComplete = mutable.HashMap[String, Int]()
val executorToTasksFailed = mutable.HashMap[String, Int]()
val executorToDuration = mutable.HashMap[String, Long]()
val executorToShuffleRead = mutable.HashMap[String, Long]()
val executorToShuffleWrite = mutable.HashMap[String, Long]()
var storageStatusList: Seq[StorageStatus] = sc.getExecutorStorageStatus

def fetchStorageStatus() {
val event = new SparkListenerStorageStatusFetch(sc.getExecutorStorageStatus)
onStorageStatusFetch(event)
}

def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) {
storageStatusList = storageStatusFetch.storageStatusList
logEvent(storageStatusFetch)
}

override def onJobStart(jobStart: SparkListenerJobStart) = logger.start()

override def onJobEnd(jobEnd: SparkListenerJobEnd) = logger.close()

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
fetchStorageStatus()
logger.flush()
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
fetchStorageStatus()
logger.flush()
}

override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = formatExecutorId(taskStart.taskInfo.executorId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,29 @@

package org.apache.spark.ui.storage

import scala.concurrent.duration._

import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.server.Handler

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.StorageStatusFetchSparkListener

/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
val indexPage = new IndexPage(this)
val rddPage = new RDDPage(this)
private var _listener: Option[StorageStatusFetchSparkListener] = None
private val indexPage = new IndexPage(this)
private val rddPage = new RDDPage(this)

def listener = _listener.get

def start() {
_listener = Some(new StorageStatusFetchSparkListener("block-manager-ui", sc))
sc.addSparkListener(listener)
}

def getHandlers = Seq[(String, Handler)](
("/storage/rdd", (request: HttpServletRequest) => rddPage.render(request)),
("/storage", (request: HttpServletRequest) => indexPage.render(request))
)
}
}
30 changes: 17 additions & 13 deletions core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,30 @@ import org.apache.spark.util.Utils

/** Page showing list of RDD's currently stored in the cluster */
private[spark] class IndexPage(parent: BlockManagerUI) {
val sc = parent.sc
private val sc = parent.sc

def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = sc.getExecutorStorageStatus
// Calculate macro-level statistics
parent.listener.fetchStorageStatus()
val storageStatusList = parent.listener.storageStatusList

val rddHeaders = Seq(
"RDD Name",
"Storage Level",
"Cached Partitions",
"Fraction Cached",
"Size in Memory",
"Size on Disk")
// Calculate macro-level statistics
val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
val content = listingTable(rddHeaders, rddRow, rdds)
val content = listingTable(rddHeader, rddRow, rdds)

headerSparkPage(content, parent.sc, "Storage ", Storage)
headerSparkPage(content, sc, "Storage ", Storage)
}

def rddRow(rdd: RDDInfo): Seq[Node] = {
/** Header fields for the RDD table */
private def rddHeader = Seq(
"RDD Name",
"Storage Level",
"Cached Partitions",
"Fraction Cached",
"Size in Memory",
"Size on Disk")

/** Render an HTML row representing an RDD */
private def rddRow(rdd: RDDInfo): Seq[Node] = {
<tr>
<td>
<a href={"%s/storage/rdd?id=%s".format(prependBaseUri(),rdd.id)}>
Expand Down
Loading

0 comments on commit d859efc

Please sign in to comment.