Skip to content

Commit

Permalink
Merge pull request apache#392 from rxin/listenerbus
Browse files Browse the repository at this point in the history
Stop SparkListenerBus daemon thread when DAGScheduler is stopped.

Otherwise this leads to hundreds of SparkListenerBus daemon threads in our unit tests (and also problematic if user applications launches multiple SparkContext).
  • Loading branch information
rxin committed Jan 13, 2014
2 parents 288a878 + 2180c87 commit 82e2b92
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class DAGScheduler(

private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]

private[spark] val listenerBus = new SparkListenerBus()
// An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped.
private[spark] val listenerBus = new SparkListenerBus

// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
Expand Down Expand Up @@ -1121,5 +1122,6 @@ class DAGScheduler(
}
metadataCleaner.cancel()
taskSched.stop()
listenerBus.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], propertie
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents

/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents

trait SparkListener {
/**
* Called when a stage is completed, with information on the completed stage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import org.apache.spark.Logging

/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
private[spark] class SparkListenerBus() extends Logging {
private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
private[spark] class SparkListenerBus extends Logging {
private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListener]

/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false

// Create a new daemon thread to listen for events. This thread is stopped when it receives
// a SparkListenerShutdown event, using the stop method.
new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
Expand All @@ -53,6 +55,9 @@ private[spark] class SparkListenerBus() extends Logging {
sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
case taskEnd: SparkListenerTaskEnd =>
sparkListeners.foreach(_.onTaskEnd(taskEnd))
case SparkListenerShutdown =>
// Get out of the while loop and shutdown the daemon thread
return
case _ =>
}
}
Expand Down Expand Up @@ -80,7 +85,7 @@ private[spark] class SparkListenerBus() extends Logging {
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!eventQueue.isEmpty()) {
while (!eventQueue.isEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
Expand All @@ -90,4 +95,6 @@ private[spark] class SparkListenerBus() extends Logging {
}
return true
}

def stop(): Unit = post(SparkListenerShutdown)
}

0 comments on commit 82e2b92

Please sign in to comment.