Skip to content

Commit

Permalink
Clean up EventLoggingListenerSuite + modify a few comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 29, 2014
1 parent 862e752 commit ad2beff
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}

/**
* Test for whether EventLoggingListener logs events properly.
* Test whether EventLoggingListener logs events properly.
*
* This tests whether EventLoggingListener actually creates special files while logging events,
* whether the parsing of these special files is correct, and whether the logged events can be
* read and deserialized into actual SparkListenerEvents.
*/
class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

import EventLoggingListenerSuite._

private val fileSystem = Utils.getHadoopFileSystem("/")
private val allCompressionCodecs = Seq[String](
"org.apache.spark.io.LZFCompressionCodec",
Expand All @@ -57,6 +55,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
test("Verify special files exist") {
testSpecialFilesExist()
testSpecialFilesExist(logDirPath = Some("/tmp/spark-foo"))
}

test("Verify special files exist with compression") {
allCompressionCodecs.foreach { codec =>
testSpecialFilesExist(compressionCodec = Some(codec))
}
Expand All @@ -65,6 +66,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
test("Parse event logging info") {
testParsingLogInfo()
testParsingLogInfo(logDirPath = Some("/tmp/spark-foo"))
}

test("Parse event logging info with compression") {
allCompressionCodecs.foreach { codec =>
testParsingLogInfo(compressionCodec = Some(codec))
}
Expand All @@ -73,6 +77,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
test("Basic event logging") {
testEventLogging()
testEventLogging(logDirPath = Some("/tmp/spark-foo"))
}

test("Basic event logging with compression") {
allCompressionCodecs.foreach { codec =>
testEventLogging(compressionCodec = Some(codec))
}
Expand All @@ -81,6 +88,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
test("End-to-end event logging") {
testApplicationEventLogging()
testApplicationEventLogging(logDirPath = Some("/tmp/spark-foo"))
}

test("End-to-end event logging with compression") {
allCompressionCodecs.foreach { codec =>
testApplicationEventLogging(compressionCodec = Some(codec))
}
Expand All @@ -91,6 +101,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
* Actual test logic *
* ----------------- */

import EventLoggingListenerSuite._

/**
* Test whether names of special files are correctly identified and parsed.
*/
Expand Down Expand Up @@ -152,10 +164,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
// Verify logging directory exists
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
eventLogger.start()
val logPath = new Path(eventLogger.logDir)
val logDir = fileSystem.getFileStatus(logPath)
assert(logDir.isDir)
eventLogger.start()

// Verify special files are as expected before stop()
var logFiles = fileSystem.listStatus(logPath)
Expand Down Expand Up @@ -228,13 +240,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
// Verify file contains exactly the two events logged
val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
assert(eventLoggingInfo.logPaths.size > 0)
val fileStream = {
val stream = fileSystem.open(eventLoggingInfo.logPaths.head)
eventLoggingInfo.compressionCodec.map { codec =>
codec.compressedInputStream(stream)
}.getOrElse(stream)
}
val lines = Source.fromInputStream(fileStream).getLines().toSeq
val lines = getLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec)
assert(lines.size === 2)
assert(lines(0).contains("SparkListenerApplicationStart"))
assert(lines(1).contains("SparkListenerApplicationEnd"))
Expand Down Expand Up @@ -272,28 +278,38 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
/**
* Assert that all of the specified events are logged by the given EventLoggingListener.
*/
private def assertEventExists(eventLogger: EventLoggingListener, events: Seq[String]) {
private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) {
val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
val logPath = eventLoggingInfo.logPaths.head
val fileStream = {
val stream = fileSystem.open(logPath)
eventLoggingInfo.compressionCodec.map { codec =>
codec.compressedInputStream(stream)
}.getOrElse(stream)
}
val lines = Source.fromInputStream(fileStream).getLines()
assert(eventLoggingInfo.logPaths.size > 0)
val lines = getLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec)
val eventSet = mutable.Set(events: _*)
lines.foreach { line =>
eventSet.foreach { event =>
if (line.contains(event) &&
JsonProtocol.sparkEventFromJson(parse(line)).getClass.getSimpleName == event) {
eventSet.remove(event)
if (line.contains(event)) {
val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
val eventType = Utils.getFormattedClassName(parsedEvent)
if (eventType == event) {
eventSet.remove(event)
}
}
}
}
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
}

/**
* Read all lines from the file specified by the given path.
* If a compression codec is specified, use it to read the file.
*/
private def getLines(filePath: Path, compressionCodec: Option[CompressionCodec]): Seq[String] = {
val fstream = fileSystem.open(filePath)
val cstream =
compressionCodec.map { codec =>
codec.compressedInputStream(fstream)
}.getOrElse(fstream)
Source.fromInputStream(cstream).getLines().toSeq
}

/**
* A listener that asserts certain events are logged by the given EventLoggingListener.
* This is necessary because events are posted asynchronously in a different thread.
Expand All @@ -304,28 +320,30 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
var appEnded = false

override def onJobStart(jobStart: SparkListenerJobStart) {
assertEventExists(eventLogger, Seq[String](
"SparkListenerApplicationStart",
"SparkListenerBlockManagerAdded",
"SparkListenerEnvironmentUpdate"
assertEventsExist(eventLogger, Seq[String](
Utils.getFormattedClassName(SparkListenerApplicationStart),
Utils.getFormattedClassName(SparkListenerBlockManagerAdded),
Utils.getFormattedClassName(SparkListenerEnvironmentUpdate)
))
jobStarted = true
}

override def onJobEnd(jobEnd: SparkListenerJobEnd) {
assertEventExists(eventLogger, Seq[String](
"SparkListenerJobStart",
"SparkListenerJobEnd",
"SparkListenerStageSubmitted",
"SparkListenerStageCompleted",
"SparkListenerTaskStart",
"SparkListenerTaskEnd"
assertEventsExist(eventLogger, Seq[String](
Utils.getFormattedClassName(SparkListenerJobStart),
Utils.getFormattedClassName(SparkListenerJobEnd),
Utils.getFormattedClassName(SparkListenerStageSubmitted),
Utils.getFormattedClassName(SparkListenerStageCompleted),
Utils.getFormattedClassName(SparkListenerTaskStart),
Utils.getFormattedClassName(SparkListenerTaskEnd)
))
jobEnded = true
}

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
assertEventExists(eventLogger, Seq[String]("SparkListenerApplicationEnd"))
assertEventsExist(eventLogger, Seq[String](
Utils.getFormattedClassName(SparkListenerApplicationEnd)
))
appEnded = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}

/**
* Test for whether ReplayListenerBus replays events from logs correctly.
* Test whether ReplayListenerBus replays events from logs correctly.
*/
class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
private val fileSystem = Utils.getHadoopFileSystem("/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.io.CompressionCodec

/**
* Tests for writing files through the FileLogger.
* Test writing files through the FileLogger.
*/
class FileLoggerSuite extends FunSuite with BeforeAndAfter {
private val fileSystem = Utils.getHadoopFileSystem("/")
Expand Down

0 comments on commit ad2beff

Please sign in to comment.