diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index a0af4becd721b..0ffe7768fff62 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -26,19 +26,17 @@ import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} /** - * Test for whether EventLoggingListener logs events properly. + * Test whether EventLoggingListener logs events properly. * * This tests whether EventLoggingListener actually creates special files while logging events, * whether the parsing of these special files is correct, and whether the logged events can be * read and deserialized into actual SparkListenerEvents. */ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { - - import EventLoggingListenerSuite._ - private val fileSystem = Utils.getHadoopFileSystem("/") private val allCompressionCodecs = Seq[String]( "org.apache.spark.io.LZFCompressionCodec", @@ -57,6 +55,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("Verify special files exist") { testSpecialFilesExist() testSpecialFilesExist(logDirPath = Some("/tmp/spark-foo")) + } + + test("Verify special files exist with compression") { allCompressionCodecs.foreach { codec => testSpecialFilesExist(compressionCodec = Some(codec)) } @@ -65,6 +66,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("Parse event logging info") { testParsingLogInfo() testParsingLogInfo(logDirPath = Some("/tmp/spark-foo")) + } + + test("Parse event logging info with compression") { allCompressionCodecs.foreach { codec => testParsingLogInfo(compressionCodec = Some(codec)) } @@ -73,6 +77,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("Basic event logging") { testEventLogging() testEventLogging(logDirPath = Some("/tmp/spark-foo")) + } + + test("Basic event logging with compression") { allCompressionCodecs.foreach { codec => testEventLogging(compressionCodec = Some(codec)) } @@ -81,6 +88,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("End-to-end event logging") { testApplicationEventLogging() testApplicationEventLogging(logDirPath = Some("/tmp/spark-foo")) + } + + test("End-to-end event logging with compression") { allCompressionCodecs.foreach { codec => testApplicationEventLogging(compressionCodec = Some(codec)) } @@ -91,6 +101,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * Actual test logic * * ----------------- */ + import EventLoggingListenerSuite._ + /** * Test whether names of special files are correctly identified and parsed. */ @@ -152,10 +164,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { // Verify logging directory exists val conf = getLoggingConf(logDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) - eventLogger.start() val logPath = new Path(eventLogger.logDir) val logDir = fileSystem.getFileStatus(logPath) assert(logDir.isDir) + eventLogger.start() // Verify special files are as expected before stop() var logFiles = fileSystem.listStatus(logPath) @@ -228,13 +240,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { // Verify file contains exactly the two events logged val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) assert(eventLoggingInfo.logPaths.size > 0) - val fileStream = { - val stream = fileSystem.open(eventLoggingInfo.logPaths.head) - eventLoggingInfo.compressionCodec.map { codec => - codec.compressedInputStream(stream) - }.getOrElse(stream) - } - val lines = Source.fromInputStream(fileStream).getLines().toSeq + val lines = getLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) assert(lines.size === 2) assert(lines(0).contains("SparkListenerApplicationStart")) assert(lines(1).contains("SparkListenerApplicationEnd")) @@ -272,28 +278,38 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { /** * Assert that all of the specified events are logged by the given EventLoggingListener. */ - private def assertEventExists(eventLogger: EventLoggingListener, events: Seq[String]) { + private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) { val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) - val logPath = eventLoggingInfo.logPaths.head - val fileStream = { - val stream = fileSystem.open(logPath) - eventLoggingInfo.compressionCodec.map { codec => - codec.compressedInputStream(stream) - }.getOrElse(stream) - } - val lines = Source.fromInputStream(fileStream).getLines() + assert(eventLoggingInfo.logPaths.size > 0) + val lines = getLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) val eventSet = mutable.Set(events: _*) lines.foreach { line => eventSet.foreach { event => - if (line.contains(event) && - JsonProtocol.sparkEventFromJson(parse(line)).getClass.getSimpleName == event) { - eventSet.remove(event) + if (line.contains(event)) { + val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line)) + val eventType = Utils.getFormattedClassName(parsedEvent) + if (eventType == event) { + eventSet.remove(event) + } } } } assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) } + /** + * Read all lines from the file specified by the given path. + * If a compression codec is specified, use it to read the file. + */ + private def getLines(filePath: Path, compressionCodec: Option[CompressionCodec]): Seq[String] = { + val fstream = fileSystem.open(filePath) + val cstream = + compressionCodec.map { codec => + codec.compressedInputStream(fstream) + }.getOrElse(fstream) + Source.fromInputStream(cstream).getLines().toSeq + } + /** * A listener that asserts certain events are logged by the given EventLoggingListener. * This is necessary because events are posted asynchronously in a different thread. @@ -304,28 +320,30 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { var appEnded = false override def onJobStart(jobStart: SparkListenerJobStart) { - assertEventExists(eventLogger, Seq[String]( - "SparkListenerApplicationStart", - "SparkListenerBlockManagerAdded", - "SparkListenerEnvironmentUpdate" + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerApplicationStart), + Utils.getFormattedClassName(SparkListenerBlockManagerAdded), + Utils.getFormattedClassName(SparkListenerEnvironmentUpdate) )) jobStarted = true } override def onJobEnd(jobEnd: SparkListenerJobEnd) { - assertEventExists(eventLogger, Seq[String]( - "SparkListenerJobStart", - "SparkListenerJobEnd", - "SparkListenerStageSubmitted", - "SparkListenerStageCompleted", - "SparkListenerTaskStart", - "SparkListenerTaskEnd" + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerJobStart), + Utils.getFormattedClassName(SparkListenerJobEnd), + Utils.getFormattedClassName(SparkListenerStageSubmitted), + Utils.getFormattedClassName(SparkListenerStageCompleted), + Utils.getFormattedClassName(SparkListenerTaskStart), + Utils.getFormattedClassName(SparkListenerTaskEnd) )) jobEnded = true } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - assertEventExists(eventLogger, Seq[String]("SparkListenerApplicationEnd")) + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerApplicationEnd) + )) appEnded = true } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 7c61210469a8f..d1a0efe867231 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} /** - * Test for whether ReplayListenerBus replays events from logs correctly. + * Test whether ReplayListenerBus replays events from logs correctly. */ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { private val fileSystem = Utils.getHadoopFileSystem("/") diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala index 4fef6583fe6a5..f477b245d8b37 100644 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.SparkConf import org.apache.spark.io.CompressionCodec /** - * Tests for writing files through the FileLogger. + * Test writing files through the FileLogger. */ class FileLoggerSuite extends FunSuite with BeforeAndAfter { private val fileSystem = Utils.getHadoopFileSystem("/")