Skip to content

Commit

Permalink
Clarify LiveListenerBus behavior + Add tests for new behavior
Browse files Browse the repository at this point in the history
The new behavior being the buffering of events before the bus is started.
  • Loading branch information
andrewor14 committed Mar 18, 2014
1 parent f80bd31 commit 124429f
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
createServletHandler("/json",
(request: HttpServletRequest) => indexPage.renderJson(request), master.securityMgr),
createServletHandler("/",
(request: HttpServletRequest) => indexPage.render (request), master.securityMgr)
(request: HttpServletRequest) => indexPage.render(request), master.securityMgr)
)
}

Expand Down Expand Up @@ -100,5 +100,5 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
}

private[spark] object MasterWebUI {
val STATIC_RESOURCE_DIR = "org/apache/spark/ui"
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, ServerInfo, UIUtils}
import org.apache.spark.ui.{JettyUtils, ServerInfo, SparkUI, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}

Expand Down Expand Up @@ -201,6 +201,6 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
}

private[spark] object WorkerWebUI {
val STATIC_RESOURCE_BASE = "org/apache/spark/ui"
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
val DEFAULT_PORT="8081"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,34 @@ import java.util.concurrent.LinkedBlockingQueue

import org.apache.spark.Logging

/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
*
* Until start() is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
*/
private[spark] class LiveListenerBus extends SparkListenerBus with Logging {

/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false

/**
* Create a new daemon thread to listen for events. Until this thread has started, all posted
* events are buffered. Only after this is called will the buffered events be released to all
* attached listeners.
* Start sending events to attached listeners.
*
* This thread is stopped when it receives a SparkListenerShutdown event, using the stop method.
* This first sends out all buffered events posted before this listener bus has started, then
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*/
def start() {
if (started) {
throw new IllegalStateException("Listener bus already started!")
}
started = true
new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
Expand Down Expand Up @@ -81,5 +92,10 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
true
}

def stop(): Unit = post(SparkListenerShutdown)
def stop() {
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
post(SparkListenerShutdown)
}
}
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.ui

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,39 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
System.clearProperty("spark.akka.frameSize")
}

test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
bus.addListener(counter)

// Listener bus hasn't started yet, so posting events should not increment counter
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
assert(counter.count === 0)

// Starting listener bus should flush all buffered events (asynchronously, hence the sleep)
bus.start()
Thread.sleep(1000)
assert(counter.count === 5)

// After listener bus has stopped, posting events should not increment counter
bus.stop()
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
assert(counter.count === 5)

// Listener bus must not be started twice
intercept[IllegalStateException] {
val bus = new LiveListenerBus
bus.start()
bus.start()
}

// ... or stopped before starting
intercept[IllegalStateException] {
val bus = new LiveListenerBus
bus.stop()
}
}

test("basic creation of StageInfo") {
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
Expand Down Expand Up @@ -207,6 +240,11 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
assert(m.sum / m.size.toDouble > 0.0, msg)
}

class BasicJobCounter extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
}

class SaveStageAndTaskInfo extends SparkListener {
val stageInfos = mutable.Map[StageInfo, Seq[(TaskInfo, TaskMetrics)]]()
var taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
Expand Down

0 comments on commit 124429f

Please sign in to comment.