Skip to content

Commit

Permalink
Move SparkListenerBus out of DAGScheduler + Clean up
Browse files Browse the repository at this point in the history
This PR introduces new SparkListenerEvents that are generated outside of DAGSchduler.
Instead of going through multiple layers (SparkContext -> DAGScheduler -> SparkListenerBus)
to post the event, we post them directly to sc.listenerBus (SparkContext -> SparkListenerBus).
This commit also cleans up the initialization order of the UI and the schedulers in SparkContext,
as well as variable names in DAGScheduler.

Further, some tests create events with null TaskInfo, which causes NPE on certain UI listeners.
This is now fixed.
  • Loading branch information
andrewor14 committed Mar 1, 2014
1 parent 5d2cec1 commit 2981d61
Show file tree
Hide file tree
Showing 14 changed files with 327 additions and 307 deletions.
50 changes: 30 additions & 20 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,22 @@ class SparkContext(
}
executorEnvs("SPARK_USER") = sparkUser

// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new SparkListenerBus

// Start the UI before posting events to listener bus, because the UI listens for Spark events
ui.start()

// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
taskScheduler.start()

@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)

// Start the UI before the DAG scheduler, because the UI listens for Spark events
ui.start()

@volatile private[spark] var dagScheduler = new DAGScheduler(this)
dagScheduler.start()
dagScheduler.post(new SparkListenerApplicationStart(appName))

updateEnvironmentProperties()
// Post initialization events
postApplicationStartEvent()
postEnvironmentUpdateEvent()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
Expand Down Expand Up @@ -640,10 +643,11 @@ class SparkContext(
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdateEvent()
}

def addSparkListener(listener: SparkListener) {
dagScheduler.addSparkListener(listener)
listenerBus.addListener(listener)
}

/**
Expand All @@ -670,7 +674,7 @@ class SparkContext(
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap

def getStageInfo: Map[Stage,StageInfo] = {
def getStageInfo: Map[Stage, StageInfo] = {
dagScheduler.stageToInfos
}

Expand Down Expand Up @@ -707,7 +711,7 @@ class SparkContext(
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
@deprecated("added files are now temporary files and need not be deleted manually", "1.0.0")
@deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0")
def clearFiles() {
addedFiles.clear()
}
Expand Down Expand Up @@ -736,7 +740,7 @@ class SparkContext(
val rddId = rdd.id
env.blockManager.master.removeRdd(rddId, blocking)
persistentRdds.remove(rddId)
dagScheduler.post(new SparkListenerUnpersistRDD(rddId))
listenerBus.post(SparkListenerUnpersistRDD(rddId))
}

/**
Expand Down Expand Up @@ -788,13 +792,14 @@ class SparkContext(
logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
}
}
postEnvironmentUpdateEvent()
}

/**
* Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
* any new nodes.
*/
@deprecated("added jars are now temporary files and need not be deleted manually", "1.0.0")
@deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0")
def clearJars() {
addedJars.clear()
}
Expand Down Expand Up @@ -1034,19 +1039,24 @@ class SparkContext(
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/**
* Update environment properties and post the corresponding event to the DAG scheduler,
* if it is ready.
*/
private def updateEnvironmentProperties() {
Option(dagScheduler).foreach { scheduler =>
/** Post the application start event if the listener bus is ready */
private def postApplicationStartEvent() {
Option(listenerBus).foreach { bus =>
val applicationStart = SparkListenerApplicationStart(appName)
bus.post(applicationStart)
}
}

/** Post the environment update event if the listener bus is ready */
private def postEnvironmentUpdateEvent() {
Option(listenerBus).foreach { bus =>
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environmentDetails =
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
val environmentUpdate = new SparkListenerEnvironmentUpdate(environmentDetails)
scheduler.post(environmentUpdate)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
bus.post(environmentUpdate)
}
}

Expand Down
Loading

0 comments on commit 2981d61

Please sign in to comment.