Skip to content

Commit

Permalink
Make AsynchronousListenerBus.waitUntilEmpty throw TimeoutException if…
Browse files Browse the repository at this point in the history
… timeout

TimeoutException is a more explicit failure. In addition, the caller doesn't need to call `assert` now.
  • Loading branch information
zsxwing committed Jun 1, 2015
1 parent 3c01568 commit 607674a
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,22 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri

/**
* For testing only. Wait until there are no more events in the queue, or until the specified
* time has elapsed. Return true if the queue has emptied and false is the specified time
* elapsed before the queue emptied.
* time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
* emptied.
*/
@VisibleForTesting
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
@throws(classOf[TimeoutException])
def waitUntilEmpty(timeoutMillis: Long): Unit = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!queueIsEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
throw new TimeoutException(
s"The event queue is not empty after $timeoutMillis milliseconds")
}
/* Sleep rather than using wait/notify, because this is used only for testing and
* wait/notify add overhead in the general case. */
Thread.sleep(10)
}
true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
Expand Down Expand Up @@ -71,7 +71,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
assert(listeners.size === 1)
val listener = listeners(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class DAGSchedulerSuite
test("[SPARK-3353] parent stage should have lower stage id") {
sparkListener.stageByOrderOfExecution.clear()
sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.stageByOrderOfExecution.length === 2)
assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
}
Expand Down Expand Up @@ -389,7 +389,7 @@ class DAGSchedulerSuite
submit(unserializableRdd, Array(0))
assert(failure.getMessage.startsWith(
"Job aborted due to stage failure: Task not serializable:"))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty()
Expand All @@ -399,7 +399,7 @@ class DAGSchedulerSuite
submit(new MyRDD(sc, 1, Nil), Array(0))
failed(taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted due to stage failure: some failure")
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty()
Expand All @@ -410,7 +410,7 @@ class DAGSchedulerSuite
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled ")
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty()
Expand Down Expand Up @@ -462,7 +462,7 @@ class DAGSchedulerSuite
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.isEmpty)
assert(sparkListener.successfulStages.contains(0))
}
Expand Down Expand Up @@ -531,7 +531,7 @@ class DAGSchedulerSuite
Map[Long, Any](),
createFakeTaskInfo(),
null))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(1))

// The second ResultTask fails, with a fetch failure for the output from the second mapper.
Expand All @@ -543,7 +543,7 @@ class DAGSchedulerSuite
createFakeTaskInfo(),
null))
// The SparkListener should not receive redundant failure events.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.size == 1)
}

Expand Down Expand Up @@ -592,7 +592,7 @@ class DAGSchedulerSuite

// Listener bus should get told about the map stage failing, but not the reduce stage
// (since the reduce stage hasn't been started yet).
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.toSet === Set(0))

assertDataStructuresEmpty()
Expand Down Expand Up @@ -643,7 +643,7 @@ class DAGSchedulerSuite
assert(cancelledStages.toSet === Set(0, 2))

// Make sure the listeners got told about both failed stages.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.successfulStages.isEmpty)
assert(sparkListener.failedStages.toSet === Set(0, 2))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// Starting listener bus should flush all buffered events
bus.start(sc)
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)

// After listener bus has stopped, posting events should not increment counter
Expand Down Expand Up @@ -131,7 +131,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd2.setName("Target RDD")
rdd2.count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

listener.stageInfos.size should be {1}
val (stageInfo, taskInfoMetrics) = listener.stageInfos.head
Expand All @@ -156,7 +156,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd3.setName("Trois")

rdd1.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be {1}
val stageInfo1 = listener.stageInfos.keys.find(_.stageId == 0).get
stageInfo1.rddInfos.size should be {1} // ParallelCollectionRDD
Expand All @@ -165,7 +165,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()

rdd2.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be {1}
val stageInfo2 = listener.stageInfos.keys.find(_.stageId == 1).get
stageInfo2.rddInfos.size should be {3} // ParallelCollectionRDD, FilteredRDD, MappedRDD
Expand All @@ -174,7 +174,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()

rdd3.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get
stageInfo3.rddInfos.size should be {1} // ShuffledRDD
Expand All @@ -190,7 +190,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val rdd2 = rdd1.map(_.toString)
sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1), true)

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

listener.stageInfos.size should be {1}
val (stageInfo, _) = listener.stageInfos.head
Expand All @@ -214,7 +214,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

val d = sc.parallelize(0 to 1e4.toInt, 64).map(w)
d.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be (1)

val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
Expand All @@ -225,7 +225,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
d4.setName("A Cogroup")
d4.collectAsMap()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be (4)
listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) =>
/**
Expand Down Expand Up @@ -281,7 +281,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
.reduce { case (x, y) => x }
assert(result === 1.to(akkaFrameSize).toArray)

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
Expand All @@ -297,7 +297,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.isEmpty)
Expand Down Expand Up @@ -352,7 +352,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

// The exception should be caught, and the event should be propagated to other listeners
assert(bus.listenerThreadIsAlive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
rdd2.setName("Target RDD")
rdd2.count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(listener.addedExecutorInfo.size == 2)
assert(listener.addedExecutorInfo("0").totalCores == 1)
assert(listener.addedExecutorInfo("1").totalCores == 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ private object YarnClusterDriver extends Logging with Matchers {
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {
Expand Down

0 comments on commit 607674a

Please sign in to comment.