Skip to content

Commit

Permalink
Clean up EventLoggingListenerSuite + add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 11, 2014
1 parent e12f4b1 commit 5d38ffe
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[spark] trait SparkListenerBus {
* Post an event to all attached listeners. This does nothing if the event is
* SparkListenerShutdown.
*/
protected def postToAll(event: SparkListenerEvent) {
def postToAll(event: SparkListenerEvent) {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.{JsonProtocol, Utils}
import org.apache.spark.io.CompressionCodec

/**
* Test for whether EventLoggingListener logs events properly.
*
* This checks whether special files are created using the specified configurations, and whether
* logged events can be read back into memory as expected.
* This tests whether EventLoggingListener actually creates special files while logging events,
* whether the parsing of these special files is correct, and whether the logged events can be
* read and deserialized into actual SparkListenerEvents.
*/
class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

private val fileSystem = Utils.getHadoopFileSystem("/")
private val allCompressionCodecs = Seq[String](
"org.apache.spark.io.LZFCompressionCodec",
Expand Down Expand Up @@ -79,6 +78,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
}


/* ----------------- *
* Actual test logic *
* ----------------- */

/**
* Test whether names of special files are correctly identified and parsed.
*/
Expand Down Expand Up @@ -182,7 +185,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
eventLogger.start()
val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir)
var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
assertInfoCorrect(eventLoggingInfo, loggerStopped = false)

Expand Down Expand Up @@ -215,7 +217,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
listenerBus.postToAll(applicationEnd)

// Verify file contains exactly the two events logged
val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir)
val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
assert(eventLoggingInfo.logPaths.size > 0)
val fileStream = {
Expand All @@ -235,86 +236,101 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

/**
* Test end-to-end event logging functionality in an application.
* This runs a simple Spark job and asserts that the expected events are logged when expected.
*/
private def testApplicationEventLogging(
logDirPath: Option[String] = None,
compressionCodec: Option[String] = None) {

val conf = getLoggingConf(logDirPath, compressionCodec)
val sc = new SparkContext("local", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir)
val expectedLogDir = logDirPath.getOrElse(EventLoggingListener.DEFAULT_LOG_DIR)
assert(eventLogger.logDir.startsWith(expectedLogDir))

// Assert all specified events are found in the event log
def assertEventExists(events: Seq[String]) {
val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
val logPath = eventLoggingInfo.logPaths.head
val fileStream = {
val stream = fileSystem.open(logPath)
eventLoggingInfo.compressionCodec.map { codec =>
codec.compressedInputStream(stream)
}.getOrElse(stream)
}
val lines = Source.fromInputStream(fileStream).getLines()
val eventSet = mutable.Set(events: _*)
lines.foreach { line =>
eventSet.foreach { event =>
if (line.contains(event) &&
JsonProtocol.sparkEventFromJson(parse(line)).getClass.getSimpleName == event) {
eventSet.remove(event)
}
// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)
sc.addSparkListener(eventExistenceListener)

// Trigger asserts for whether the expected events are actually logged
sc.parallelize(1 to 10000).count()
sc.stop()

// Ensure all asserts have actually been triggered
eventExistenceListener.assertAllCallbacksInvoked()
}

/**
* Assert that all of the specified events are logged by the given EventLoggingListener.
*/
private def assertEventExists(eventLogger: EventLoggingListener, events: Seq[String]) {
val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
val logPath = eventLoggingInfo.logPaths.head
val fileStream = {
val stream = fileSystem.open(logPath)
eventLoggingInfo.compressionCodec.map { codec =>
codec.compressedInputStream(stream)
}.getOrElse(stream)
}
val lines = Source.fromInputStream(fileStream).getLines()
val eventSet = mutable.Set(events: _*)
lines.foreach { line =>
eventSet.foreach { event =>
if (line.contains(event) &&
JsonProtocol.sparkEventFromJson(parse(line)).getClass.getSimpleName == event) {
eventSet.remove(event)
}
}
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
}
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
}

// SparkListenerEvents are posted in a separate thread
class AssertEventListener extends SparkListener {
var jobStarted = false
var jobEnded = false
var appEnded = false

override def onJobStart(jobStart: SparkListenerJobStart) {
assertEventExists(Seq[String](
"SparkListenerApplicationStart",
"SparkListenerBlockManagerAdded",
"SparkListenerEnvironmentUpdate"
))
jobStarted = true
}
/**
* A listener that asserts certain events are logged by the given EventLoggingListener.
* This is necessary because events are posted asynchronously in a different thread.
*/
private class EventExistenceListener(eventLogger: EventLoggingListener) extends SparkListener {
var jobStarted = false
var jobEnded = false
var appEnded = false

override def onJobStart(jobStart: SparkListenerJobStart) {
assertEventExists(eventLogger, Seq[String](
"SparkListenerApplicationStart",
"SparkListenerBlockManagerAdded",
"SparkListenerEnvironmentUpdate"
))
jobStarted = true
}

override def onJobEnd(jobEnd: SparkListenerJobEnd) {
assertEventExists(Seq[String](
"SparkListenerJobStart",
"SparkListenerJobEnd",
"SparkListenerStageSubmitted",
"SparkListenerStageCompleted",
"SparkListenerTaskStart",
"SparkListenerTaskEnd"
))
jobEnded = true
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
assertEventExists(eventLogger, Seq[String](
"SparkListenerJobStart",
"SparkListenerJobEnd",
"SparkListenerStageSubmitted",
"SparkListenerStageCompleted",
"SparkListenerTaskStart",
"SparkListenerTaskEnd"
))
jobEnded = true
}

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
assertEventExists(Seq[String]("SparkListenerApplicationEnd"))
appEnded = true
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
assertEventExists(eventLogger, Seq[String]("SparkListenerApplicationEnd"))
appEnded = true
}
val assertEventListener = new AssertEventListener
sc.addSparkListener(assertEventListener)

// Trigger callbacks
sc.parallelize(1 to 10000).count()
sc.stop()
assert(assertEventListener.jobStarted, "JobStart callback not invoked!")
assert(assertEventListener.jobEnded, "JobEnd callback not invoked!")
assert(assertEventListener.appEnded, "ApplicationEnd callback not invoked!")
def assertAllCallbacksInvoked() {
assert(jobStarted, "JobStart callback not invoked!")
assert(jobEnded, "JobEnd callback not invoked!")
assert(appEnded, "ApplicationEnd callback not invoked!")
}
}

/* Helper methods for validating state of the special files. */

/* -------------------------------------------------------- *
* Helper methods for validating state of the special files *
* -------------------------------------------------------- */

private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = {
logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile)
Expand Down

0 comments on commit 5d38ffe

Please sign in to comment.