Skip to content

Commit

Permalink
Fix test failures caused by race condition in processing/mutating events
Browse files Browse the repository at this point in the history
An event can be mutated by the DAGScheduler in between being procssed by one
listener and being processed by another. This causes the ReplayListenerSuite
to be flaky. This commit ensures that the event logged is the same as the
original event received by the EventLoggingListener.
  • Loading branch information
andrewor14 committed Apr 29, 2014
1 parent b990453 commit e0ba2f8
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.scheduler

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SparkContext}
Expand Down Expand Up @@ -48,14 +50,18 @@ private[spark] class EventLoggingListener(

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
val logDir = logBaseDir + "/" + name

private val logger =
protected val logger =
new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite)

// For testing. Keep track of all JSON serialized events that have been logged.
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

/**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
Expand All @@ -73,11 +79,14 @@ private[spark] class EventLoggingListener(

/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
logger.logLine(eventJson)
val eventJson = JsonProtocol.sparkEventToJson(event)
logger.logLine(compact(render(eventJson)))
if (flushLogger) {
logger.flush()
}
if (testing) {
loggedEvents += eventJson
}
}

// Events that do not trigger a flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ object EventLoggingListenerSuite {
compressionCodec: Option[String] = None) = {
val conf = new SparkConf
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.testing", "true")
logDir.foreach { dir =>
conf.set("spark.eventLog.dir", dir)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package org.apache.spark.scheduler

import java.io.PrintWriter

import scala.collection.mutable.ArrayBuffer
import scala.util.Try

import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.{JsonProtocol, Utils}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}

/**
* Test for whether ReplayListenerBus replays events from logs correctly.
Expand All @@ -41,7 +41,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
)

after {

Try { fileSystem.delete(new Path("/tmp/events.txt"), true) }
Try { fileSystem.delete(new Path("/tmp/test-replay"), true) }
}

test("Simple replay") {
Expand All @@ -54,10 +55,18 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
}
}

// This assumes the correctness of EventLoggingListener
test("End-to-end replay") {
testApplicationReplay()
}

// This assumes the correctness of EventLoggingListener
test("End-to-end replay with compression") {
allCompressionCodecs.foreach { codec =>
testApplicationReplay(Some(codec))
}
}


/* ----------------- *
* Actual test logic *
Expand All @@ -78,31 +87,37 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
writer.close()
val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
val eventKeeper = new EventKeeper
replayer.addListener(eventKeeper)
val conf = EventLoggingListenerSuite.getLoggingConf(compressionCodec = codecName)
val eventMonster = new EventMonster(conf)
replayer.addListener(eventMonster)
replayer.replay()
assert(eventKeeper.events.size === 2)
assert(eventKeeper.events(0) === applicationStart)
assert(eventKeeper.events(1) === applicationEnd)
assert(eventMonster.loggedEvents.size === 2)
assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
}

/**
* Test end-to-end replaying of events.
*
* This test runs a few simple jobs with event logging enabled, and compares each emitted
* event to the corresponding event replayed from the event logs. This test makes the
* assumption that the event logging behavior is correct (tested in a separate suite).
*/
private def testApplicationReplay(codecName: Option[String] = None) {
val logDir = "/tmp/test-replay"
val logDirPath = new Path(logDir)
val conf = EventLoggingListenerSuite.getLoggingConf(Some(logDir), codecName)
val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf)
val eventKeeper = new EventKeeper
sc.addSparkListener(eventKeeper)

// Run a job
sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().cache().count()
// Run a few jobs
sc.parallelize(1 to 100, 1).count()
sc.parallelize(1 to 100, 2).map(i => (i, i)).count()
sc.parallelize(1 to 100, 3).map(i => (i, i)).groupByKey().count()
sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().persist().count()
sc.stop()

// Find the log file
val applications = fileSystem.listStatus(logDirPath)
// Prepare information needed for replay
val codec = codecName.map(getCompressionCodec)
val applications = fileSystem.listStatus(new Path(logDir))
assert(applications != null && applications.size > 0)
val eventLogDir =
applications.filter(_.getPath.getName.startsWith("test-replay")).sortBy(_.getAccessTime).last
Expand All @@ -111,53 +126,34 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
assert(logFiles != null && logFiles.size > 0)
val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_"))
assert(logFile.isDefined)
val codec = codecName.map(getCompressionCodec)

// Replay events
val replayer = new ReplayListenerBus(Seq(logFile.get.getPath), fileSystem, codec)
val replayEventKeeper = new EventKeeper
replayer.addListener(replayEventKeeper)
val eventMonster = new EventMonster(conf)
replayer.addListener(eventMonster)
replayer.replay()

// Verify the same events are replayed in the same order
val filteredEvents = filterSchedulerEvents(eventKeeper.events)
val filteredReplayEvents = filterSchedulerEvents(replayEventKeeper.events)
assert(filteredEvents.size === filteredReplayEvents.size)
filteredEvents.zip(filteredReplayEvents).foreach { case (e1, e2) =>
assert(JsonProtocol.sparkEventToJson(e1) === JsonProtocol.sparkEventToJson(e2))
}
assert(sc.eventLogger.isDefined)
val originalEvents = sc.eventLogger.get.loggedEvents
val replayedEvents = eventMonster.loggedEvents
originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) }
}

/**
* A simple listener that keeps all events it receives
* A simple listener that buffers all the events it receives.
*
* The event buffering functionality must be implemented within EventLoggingListener itself.
* This is because of the following race condition: the event may be mutated between being
* processed by one listener and being processed by another. Thus, in order to establish
* a fair comparison between the original events and the replayed events, both functionalities
* must be implemented within one listener (i.e. the EventLoggingListener).
*
* This child listener inherits only the event buffering functionality, but does not actually
* log the events.
*/
private class EventKeeper extends SparkListener {
val events = new ArrayBuffer[SparkListenerEvent]
override def onStageSubmitted(e: SparkListenerStageSubmitted) { events += e }
override def onStageCompleted(e: SparkListenerStageCompleted) { events += e }
override def onTaskStart(e: SparkListenerTaskStart) { events += e }
override def onTaskGettingResult(e: SparkListenerTaskGettingResult) { events += e }
override def onTaskEnd(e: SparkListenerTaskEnd) { events += e }
override def onJobStart(e: SparkListenerJobStart) { events += e }
override def onJobEnd(e: SparkListenerJobEnd) { events += e }
override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) { events += e }
override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) = { events += e }
override def onBlockManagerRemoved(e: SparkListenerBlockManagerRemoved) = { events += e }
override def onUnpersistRDD(e: SparkListenerUnpersistRDD) { events += e }
override def onApplicationStart(e: SparkListenerApplicationStart) { events += e }
override def onApplicationEnd(e: SparkListenerApplicationEnd) { events += e }
}

private def filterSchedulerEvents(events: Seq[SparkListenerEvent]): Seq[SparkListenerEvent] = {
events.collect {
case e: SparkListenerStageSubmitted => e
case e: SparkListenerStageCompleted => e
case e: SparkListenerTaskStart => e
case e: SparkListenerTaskGettingResult => e
case e: SparkListenerTaskEnd => e
case e: SparkListenerJobStart => e
case e: SparkListenerJobEnd => e
}
private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) {
logger.close()
}

private def getCompressionCodec(codecName: String) = {
Expand Down

0 comments on commit e0ba2f8

Please sign in to comment.