From e0ba2f8e8c37adc4c57684b2a42aa20fc2e19b19 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 29 Apr 2014 13:25:04 -0700 Subject: [PATCH] Fix test failures caused by race condition in processing/mutating events An event can be mutated by the DAGScheduler in between being procssed by one listener and being processed by another. This causes the ReplayListenerSuite to be flaky. This commit ensures that the event logged is the same as the original event received by the EventLoggingListener. --- .../scheduler/EventLoggingListener.scala | 15 ++- .../scheduler/EventLoggingListenerSuite.scala | 1 + .../spark/scheduler/ReplayListenerSuite.scala | 100 +++++++++--------- 3 files changed, 61 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index e7780c6ed3f66..4b5bdca34ef56 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -18,9 +18,11 @@ package org.apache.spark.scheduler import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SparkContext} @@ -48,14 +50,18 @@ private[spark] class EventLoggingListener( private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) + private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis val logDir = logBaseDir + "/" + name - private val logger = + protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite) + // For testing. Keep track of all JSON serialized events that have been logged. + private[scheduler] val loggedEvents = new ArrayBuffer[JValue] + /** * Begin logging events. * If compression is used, log a file that indicates which compression library is used. @@ -73,11 +79,14 @@ private[spark] class EventLoggingListener( /** Log the event as JSON. */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { - val eventJson = compact(render(JsonProtocol.sparkEventToJson(event))) - logger.logLine(eventJson) + val eventJson = JsonProtocol.sparkEventToJson(event) + logger.logLine(compact(render(eventJson))) if (flushLogger) { logger.flush() } + if (testing) { + loggedEvents += eventJson + } } // Events that do not trigger a flush diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index f202d94bbf7c7..a0af4becd721b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -380,6 +380,7 @@ object EventLoggingListenerSuite { compressionCodec: Option[String] = None) = { val conf = new SparkConf conf.set("spark.eventLog.enabled", "true") + conf.set("spark.eventLog.testing", "true") logDir.foreach { dir => conf.set("spark.eventLog.dir", dir) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 9ea0d5254178a..7c61210469a8f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.io.PrintWriter -import scala.collection.mutable.ArrayBuffer +import scala.util.Try import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ @@ -27,8 +27,8 @@ import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.SparkContext._ import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.util.{JsonProtocol, Utils} import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.{JsonProtocol, Utils} /** * Test for whether ReplayListenerBus replays events from logs correctly. @@ -41,7 +41,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { ) after { - + Try { fileSystem.delete(new Path("/tmp/events.txt"), true) } + Try { fileSystem.delete(new Path("/tmp/test-replay"), true) } } test("Simple replay") { @@ -54,10 +55,18 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { } } + // This assumes the correctness of EventLoggingListener test("End-to-end replay") { testApplicationReplay() } + // This assumes the correctness of EventLoggingListener + test("End-to-end replay with compression") { + allCompressionCodecs.foreach { codec => + testApplicationReplay(Some(codec)) + } + } + /* ----------------- * * Actual test logic * @@ -78,31 +87,37 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) writer.close() val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) - val eventKeeper = new EventKeeper - replayer.addListener(eventKeeper) + val conf = EventLoggingListenerSuite.getLoggingConf(compressionCodec = codecName) + val eventMonster = new EventMonster(conf) + replayer.addListener(eventMonster) replayer.replay() - assert(eventKeeper.events.size === 2) - assert(eventKeeper.events(0) === applicationStart) - assert(eventKeeper.events(1) === applicationEnd) + assert(eventMonster.loggedEvents.size === 2) + assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart)) + assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd)) } /** + * Test end-to-end replaying of events. * + * This test runs a few simple jobs with event logging enabled, and compares each emitted + * event to the corresponding event replayed from the event logs. This test makes the + * assumption that the event logging behavior is correct (tested in a separate suite). */ private def testApplicationReplay(codecName: Option[String] = None) { val logDir = "/tmp/test-replay" - val logDirPath = new Path(logDir) val conf = EventLoggingListenerSuite.getLoggingConf(Some(logDir), codecName) val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf) - val eventKeeper = new EventKeeper - sc.addSparkListener(eventKeeper) - // Run a job - sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().cache().count() + // Run a few jobs + sc.parallelize(1 to 100, 1).count() + sc.parallelize(1 to 100, 2).map(i => (i, i)).count() + sc.parallelize(1 to 100, 3).map(i => (i, i)).groupByKey().count() + sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().persist().count() sc.stop() - // Find the log file - val applications = fileSystem.listStatus(logDirPath) + // Prepare information needed for replay + val codec = codecName.map(getCompressionCodec) + val applications = fileSystem.listStatus(new Path(logDir)) assert(applications != null && applications.size > 0) val eventLogDir = applications.filter(_.getPath.getName.startsWith("test-replay")).sortBy(_.getAccessTime).last @@ -111,53 +126,34 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { assert(logFiles != null && logFiles.size > 0) val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_")) assert(logFile.isDefined) - val codec = codecName.map(getCompressionCodec) // Replay events val replayer = new ReplayListenerBus(Seq(logFile.get.getPath), fileSystem, codec) - val replayEventKeeper = new EventKeeper - replayer.addListener(replayEventKeeper) + val eventMonster = new EventMonster(conf) + replayer.addListener(eventMonster) replayer.replay() // Verify the same events are replayed in the same order - val filteredEvents = filterSchedulerEvents(eventKeeper.events) - val filteredReplayEvents = filterSchedulerEvents(replayEventKeeper.events) - assert(filteredEvents.size === filteredReplayEvents.size) - filteredEvents.zip(filteredReplayEvents).foreach { case (e1, e2) => - assert(JsonProtocol.sparkEventToJson(e1) === JsonProtocol.sparkEventToJson(e2)) - } + assert(sc.eventLogger.isDefined) + val originalEvents = sc.eventLogger.get.loggedEvents + val replayedEvents = eventMonster.loggedEvents + originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) } } /** - * A simple listener that keeps all events it receives + * A simple listener that buffers all the events it receives. + * + * The event buffering functionality must be implemented within EventLoggingListener itself. + * This is because of the following race condition: the event may be mutated between being + * processed by one listener and being processed by another. Thus, in order to establish + * a fair comparison between the original events and the replayed events, both functionalities + * must be implemented within one listener (i.e. the EventLoggingListener). + * + * This child listener inherits only the event buffering functionality, but does not actually + * log the events. */ - private class EventKeeper extends SparkListener { - val events = new ArrayBuffer[SparkListenerEvent] - override def onStageSubmitted(e: SparkListenerStageSubmitted) { events += e } - override def onStageCompleted(e: SparkListenerStageCompleted) { events += e } - override def onTaskStart(e: SparkListenerTaskStart) { events += e } - override def onTaskGettingResult(e: SparkListenerTaskGettingResult) { events += e } - override def onTaskEnd(e: SparkListenerTaskEnd) { events += e } - override def onJobStart(e: SparkListenerJobStart) { events += e } - override def onJobEnd(e: SparkListenerJobEnd) { events += e } - override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) { events += e } - override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) = { events += e } - override def onBlockManagerRemoved(e: SparkListenerBlockManagerRemoved) = { events += e } - override def onUnpersistRDD(e: SparkListenerUnpersistRDD) { events += e } - override def onApplicationStart(e: SparkListenerApplicationStart) { events += e } - override def onApplicationEnd(e: SparkListenerApplicationEnd) { events += e } - } - - private def filterSchedulerEvents(events: Seq[SparkListenerEvent]): Seq[SparkListenerEvent] = { - events.collect { - case e: SparkListenerStageSubmitted => e - case e: SparkListenerStageCompleted => e - case e: SparkListenerTaskStart => e - case e: SparkListenerTaskGettingResult => e - case e: SparkListenerTaskEnd => e - case e: SparkListenerJobStart => e - case e: SparkListenerJobEnd => e - } + private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) { + logger.close() } private def getCompressionCodec(codecName: String) = {