Skip to content

Commit

Permalink
[SPARK-1132] Persisting Web UI through refactoring the SparkListener …
Browse files Browse the repository at this point in the history
…interface

The fleeting nature of the Spark Web UI has long been a problem reported by many users: The existing Web UI disappears as soon as the associated application terminates. This is because SparkUI is tightly coupled with SparkContext, and cannot be instantiated independently from it. To solve this, some state must be saved to persistent storage while the application is still running.

The approach taken by this PR involves persisting the UI state through SparkListenerEvents. This requires a major refactor of the SparkListener interface because existing events (1) maintain deep references, making de/serialization is difficult, and (2) do not encode all the information displayed on the UI. In this design, each existing listener for the UI (e.g. ExecutorsListener) maintains state that can be fully constructed from SparkListenerEvents. This state is then supplied to the parent UI (e.g. ExecutorsUI), which renders the associated page(s) on demand.

This PR introduces two important classes: the **EventLoggingListener**, and the **ReplayListenerBus**. In a live application, SparkUI registers an EventLoggingListener with the SparkContext in addition to the existing listeners. Over the course of the application, this listener serializes and logs all events to persisted storage. Then, after the application has finished, the SparkUI can be revived by replaying all the logged events to the existing UI listeners through the ReplayListenerBus.

This feature is currently integrated with the Master Web UI, which optionally rebuilds a SparkUI from event logs as soon as the corresponding application finishes.

More details can be found in the commit messages, comments within the code, and the [design doc](https://spark-project.atlassian.net/secure/attachment/12900/PersistingSparkWebUI.pdf). Comments and feedback are most welcome.

Author: Andrew Or <[email protected]>
Author: andrewor14 <[email protected]>

Closes #42 from andrewor14/master and squashes the following commits:

e5f14fa [Andrew Or] Merge github.com:apache/spark
a1c5cd9 [Andrew Or] Merge github.com:apache/spark
b8ba817 [Andrew Or] Remove UI from map when removing application in Master
83af656 [Andrew Or] Scraps and pieces (no functionality change)
222adcd [Andrew Or] Merge github.com:apache/spark
124429f [Andrew Or] Clarify LiveListenerBus behavior + Add tests for new behavior
f80bd31 [Andrew Or] Simplify static handler and BlockManager status update logic
9e14f97 [Andrew Or] Moved around functionality + renamed classes per Patrick
6740e49 [Andrew Or] Fix comment nits
650eb12 [Andrew Or] Add unit tests + Fix bugs found through tests
45fd84c [Andrew Or] Remove now deprecated test
c5c2c8f [Andrew Or] Remove list of (TaskInfo, TaskMetrics) from StageInfo
3456090 [Andrew Or] Address Patrick's comments
bf80e3d [Andrew Or] Imports, comments, and code formatting, once again (minor)
ac69ec8 [Andrew Or] Fix test fail
d801d11 [Andrew Or] Merge github.com:apache/spark (major)
dc93915 [Andrew Or] Imports, comments, and code formatting (minor)
77ba283 [Andrew Or] Address Kay's and Patrick's comments
b6eaea7 [Andrew Or] Treating SparkUI as a handler of MasterUI
d59da5f [Andrew Or] Avoid logging all the blocks on each executor
d6e3b4a [Andrew Or] Merge github.com:apache/spark
ca258a4 [Andrew Or] Master UI - add support for reading compressed event logs
176e68e [Andrew Or] Fix deprecated message for JavaSparkContext (minor)
4f69c4a [Andrew Or] Master UI - Rebuild SparkUI on application finish
291b2be [Andrew Or] Correct directory in log message "INFO: Logging events to <dir>"
1ba3407 [Andrew Or] Add a few configurable options to event logging
e375431 [Andrew Or] Add new constructors for SparkUI
18b256d [Andrew Or] Refactor out event logging and replaying logic from UI
bb4c503 [Andrew Or] Use a more mnemonic path for logging
aef411c [Andrew Or] Fix bug: storage status was not reflected on UI in the local case
03eda0b [Andrew Or] Fix HDFS flush behavior
36b3e5d [Andrew Or] Add HDFS support for event logging
cceff2b [andrewor14] Fix 100 char format fail
2fee310 [Andrew Or] Address Patrick's comments
2981d61 [Andrew Or] Move SparkListenerBus out of DAGScheduler + Clean up
5d2cec1 [Andrew Or] JobLogger: ID -> Id
0503e4b [Andrew Or] Fix PySpark tests + remove sc.clearFiles/clearJars
4d2fb0c [Andrew Or] Fix format fail
faa113e [Andrew Or] General clean up
d47585f [Andrew Or] Clean up FileLogger
472fd8a [Andrew Or] Fix a couple of tests
996d7a2 [Andrew Or] Reflect RDD unpersist on UI
7b2f811 [Andrew Or] Guard against TaskMetrics NPE + Fix tests
d1f4285 [Andrew Or] Migrate from lift-json to json4s-jackson
28019ca [Andrew Or] Merge github.com:apache/spark
bbe3501 [Andrew Or] Embed storage status and RDD info in Task events
6631c02 [Andrew Or] More formatting changes, this time mainly for Json DSL
70e7e7a [Andrew Or] Formatting changes
e9e1c6d [Andrew Or] Move all JSON de/serialization logic to JsonProtocol
d646df6 [Andrew Or] Completely decouple SparkUI from SparkContext
6814da0 [Andrew Or] Explicitly register each UI listener rather than through some magic
64d2ce1 [Andrew Or] Fix BlockManagerUI bug by introducing new event
4273013 [Andrew Or] Add a gateway SparkListener to simplify event logging
904c729 [Andrew Or] Fix another major bug
5ac906d [Andrew Or] Mostly naming, formatting, and code style changes
3fd584e [Andrew Or] Fix two major bugs
f3fc13b [Andrew Or] General refactor
4dfcd22 [Andrew Or] Merge git://git.apache.org/incubator-spark into persist-ui
b3976b0 [Andrew Or] Add functionality of reconstructing a persisted UI from SparkContext
8add36b [Andrew Or] JobProgressUI: Add JSON functionality
d859efc [Andrew Or] BlockManagerUI: Add JSON functionality
c4cd480 [Andrew Or] Also deserialize new events
8a2ebe6 [Andrew Or] Fix bugs for EnvironmentUI and ExecutorsUI
de8a1cd [Andrew Or] Serialize events both to and from JSON (rather than just to)
bf0b2e9 [Andrew Or] ExecutorUI: Serialize events rather than arbitary executor information
bb222b9 [Andrew Or] ExecutorUI: render completely from JSON
dcbd312 [Andrew Or] Add JSON Serializability for all SparkListenerEvent's
10ed49d [Andrew Or] Merge github.com:apache/incubator-spark into persist-ui
8e09306 [Andrew Or] Use JSON for ExecutorsUI
e3ae35f [Andrew Or] Merge github.com:apache/incubator-spark
3ddeb7e [Andrew Or] Also privatize fields
090544a [Andrew Or] Privatize methods
13920c9 [Andrew Or] Update docs
bd5a1d7 [Andrew Or] Typo: phyiscal -> physical
287ef44 [Andrew Or] Avoid reading the entire batch into memory; also simplify streaming logic
3df7005 [Andrew Or] Merge branch 'master' of github.com:andrewor14/incubator-spark
a531d2e [Andrew Or] Relax assumptions on compressors and serializers when batching
164489d [Andrew Or] Relax assumptions on compressors and serializers when batching
  • Loading branch information
andrewor14 authored and pwendell committed Mar 19, 2014
1 parent ab747d3 commit 79d07d6
Show file tree
Hide file tree
Showing 84 changed files with 4,268 additions and 1,944 deletions.
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark

import scala.{Option, deprecated}

import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
Expand Down
76 changes: 45 additions & 31 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashSet}

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockManager, RDDBlockId, StorageLevel}
import org.apache.spark.storage.{BlockId, BlockManager, BlockStatus, RDDBlockId, StorageLevel}

/** Spark class responsible for passing RDDs split contents to the BlockManager and making
sure a node doesn't load two copies of an RDD at once.
*/
/**
* Spark class responsible for passing RDDs split contents to the BlockManager and making
* sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Keys of RDD splits that are being computed/loaded. */
Expand All @@ -49,11 +50,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
try {loading.wait()} catch {case _ : Throwable =>}
}
logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn't want to make space for it. However, that case is unlikely
// because it's unlikely that two threads would work on the same RDD partition. One
// downside of the current code is that threads wait serially if this does happen.
/* See whether someone else has successfully loaded it. The main way this would fail
* is for the RDD-level cache eviction policy if someone else has loaded the same RDD
* partition but we didn't want to make space for it. However, that case is unlikely
* because it's unlikely that two threads would work on the same RDD partition. One
* downside of the current code is that threads wait serially if this does happen. */
blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
Expand All @@ -69,32 +70,45 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// If we got here, we have to load the split
logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)

// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
if (storageLevel.useDisk && !storageLevel.useMemory) {
// In the case that this RDD is to be persisted using DISK_ONLY
// the iterator will be passed directly to the blockManager (rather then
// caching it to an ArrayBuffer first), then the resulting block data iterator
// will be passed back to the user. If the iterator generates a lot of data,
// this means that it doesn't all have to be held in memory at one time.
// This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
// blocks aren't dropped by the block store before enabling that.
blockManager.put(key, computedValues, storageLevel, tellMaster = true)
return blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Failure to store %s".format(key))
throw new Exception("Block manager failed to return persisted valued")

// Keep track of blocks with updated statuses
var updatedBlocks = Seq[(BlockId, BlockStatus)]()
val returnValue: Iterator[T] = {
if (storageLevel.useDisk && !storageLevel.useMemory) {
/* In the case that this RDD is to be persisted using DISK_ONLY
* the iterator will be passed directly to the blockManager (rather then
* caching it to an ArrayBuffer first), then the resulting block data iterator
* will be passed back to the user. If the iterator generates a lot of data,
* this means that it doesn't all have to be held in memory at one time.
* This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
* blocks aren't dropped by the block store before enabling that. */
updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(values) =>
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Failure to store %s".format(key))
throw new Exception("Block manager failed to return persisted valued")
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
// if we're dealing with a 'one-time' iterator
val elements = new ArrayBuffer[Any]
elements ++= computedValues
updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
// if we're dealing with a 'one-time' iterator
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
return elements.iterator.asInstanceOf[Iterator[T]]
}

// Update task metrics to include any blocks whose storage status is updated
val metrics = context.taskMetrics
metrics.updatedBlocks = Some(updatedBlocks)

returnValue

} finally {
loading.synchronized {
loading.remove(key)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark

import java.net.{Authenticator, PasswordAuthentication}
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil

/**
* Spark class responsible for security.
*
Expand Down
93 changes: 72 additions & 21 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,18 @@ class SparkContext(

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus

// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.create(
conf,
"<driver>",
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
isDriver = true,
isLocal = isLocal)
isLocal = isLocal,
listenerBus = listenerBus)
SparkEnv.set(env)

// Used to store a URL for each static file/jar together with the file's local timestamp
Expand All @@ -151,9 +155,26 @@ class SparkContext(
private[spark] val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

// Initialize the Spark UI
// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
ui.start()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf)
listenerBus.addListener(logger)
Some(logger)
} else None
}

// Information needed to replay logged events, if any
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
eventLogger.map { logger => Some(logger.info) }.getOrElse(None)

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

val startTime = System.currentTimeMillis()

Expand Down Expand Up @@ -200,13 +221,13 @@ class SparkContext(
executorEnvs("SPARK_USER") = sparkUser

// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
taskScheduler.start()

@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
@volatile private[spark] var dagScheduler = new DAGScheduler(this)
dagScheduler.start()

ui.start()
postEnvironmentUpdate()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
Expand Down Expand Up @@ -571,7 +592,6 @@ class SparkContext(
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
}


protected[spark] def checkpointFile[T: ClassTag](
path: String
): RDD[T] = {
Expand Down Expand Up @@ -641,10 +661,11 @@ class SparkContext(
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf, env.securityManager)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
}

def addSparkListener(listener: SparkListener) {
dagScheduler.addSparkListener(listener)
listenerBus.addListener(listener)
}

/**
Expand All @@ -671,7 +692,7 @@ class SparkContext(
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap

def getStageInfo: Map[Stage,StageInfo] = {
def getStageInfo: Map[Stage, StageInfo] = {
dagScheduler.stageToInfos
}

Expand All @@ -698,7 +719,7 @@ class SparkContext(
}

/**
* Return current scheduling mode
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
Expand All @@ -708,6 +729,7 @@ class SparkContext(
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
@deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0")
def clearFiles() {
addedFiles.clear()
}
Expand All @@ -722,6 +744,23 @@ class SparkContext(
dagScheduler.getPreferredLocs(rdd, partition)
}

/**
* Register an RDD to be persisted in memory and/or disk storage
*/
private[spark] def persistRDD(rdd: RDD[_]) {
persistentRdds(rdd.id) = rdd
}

/**
* Unpersist an RDD from memory and/or disk storage
*/
private[spark] def unpersistRDD(rdd: RDD[_], blocking: Boolean = true) {
val rddId = rdd.id
env.blockManager.master.removeRdd(rddId, blocking)
persistentRdds.remove(rddId)
listenerBus.post(SparkListenerUnpersistRDD(rddId))
}

/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
Expand All @@ -744,15 +783,15 @@ class SparkContext(
if (SparkHadoopUtil.get.isYarnMode() &&
(master == "yarn-standalone" || master == "yarn-cluster")) {
// In order for this to work in yarn-cluster mode the user must specify the
// --addjars option to the client to upload the file into the distributed cache
// --addjars option to the client to upload the file into the distributed cache
// of the AM to make it show up in the current working directory.
val fileName = new Path(uri.getPath).getName()
try {
env.httpFileServer.addJar(new File(fileName))
} catch {
case e: Exception => {
// For now just log an error but allow to go through so spark examples work.
// The spark examples don't really need the jar distributed since its also
// The spark examples don't really need the jar distributed since its also
// the app jar.
logError("Error adding jar (" + e + "), was the --addJars option used?")
null
Expand All @@ -773,32 +812,33 @@ class SparkContext(
logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
}
}
postEnvironmentUpdate()
}

/**
* Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
* any new nodes.
*/
@deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0")
def clearJars() {
addedJars.clear()
}

/** Shut down the SparkContext. */
def stop() {
ui.stop()
eventLogger.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
dagScheduler = null
if (dagSchedulerCopy != null) {
metadataCleaner.cancel()
dagSchedulerCopy.stop()
listenerBus.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
// Clean up locally linked files
clearFiles()
clearJars()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
Expand Down Expand Up @@ -1026,6 +1066,19 @@ class SparkContext(
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/** Post the environment update event once the task scheduler is ready */
private def postEnvironmentUpdate() {
if (taskScheduler != null) {
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environmentDetails =
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
}

/** Called by MetadataCleaner to clean up the persistentRdds map periodically */
private[spark] def cleanup(cleanupTime: Long) {
persistentRdds.clearOldValues(cleanupTime)
Expand Down Expand Up @@ -1189,9 +1242,7 @@ object SparkContext extends Logging {
}

/** Creates a task scheduler based on a given master URL. Extracted for testing. */
private def createTaskScheduler(sc: SparkContext, master: String, appName: String)
: TaskScheduler =
{
private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
Expand Down Expand Up @@ -1230,7 +1281,7 @@ object SparkContext extends Logging {
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
scheduler

Expand All @@ -1247,7 +1298,7 @@ object SparkContext extends Logging {
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val masterUrls = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
Expand Down Expand Up @@ -1307,9 +1358,9 @@ object SparkContext extends Logging {
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
new CoarseMesosSchedulerBackend(scheduler, sc, url)
} else {
new MesosSchedulerBackend(scheduler, sc, url, appName)
new MesosSchedulerBackend(scheduler, sc, url)
}
scheduler.initialize(backend)
scheduler
Expand Down
Loading

0 comments on commit 79d07d6

Please sign in to comment.