From e12f4b1ef342b6f0a1b2188904951bb4256263b2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 11 Apr 2014 14:16:49 -0700 Subject: [PATCH 01/11] Preliminary tests for EventLoggingListener (need major cleanup) --- .../scheduler/EventLoggingListener.scala | 3 +- .../scheduler/EventLoggingListenerSuite.scala | 363 ++++++++++++++++++ 2 files changed, 365 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index b983c16af14f4..e891710221041 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -44,7 +44,7 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false) private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024 - private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/") + private val logBaseDir = conf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis val logDir = logBaseDir + "/" + name @@ -115,6 +115,7 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) } private[spark] object EventLoggingListener extends Logging { + val DEFAULT_LOG_DIR = "/tmp/spark-events" val LOG_PREFIX = "EVENT_LOG_" val SPARK_VERSION_PREFIX = "SPARK_VERSION_" val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala new file mode 100644 index 0000000000000..45e9f5b7cb12d --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import scala.io.Source + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.json4s.jackson.JsonMethods._ +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.io.CompressionCodec + +/** + * Test for whether EventLoggingListener logs events properly. + * + * This checks whether special files are created using the specified configurations, and whether + * logged events can be read back into memory as expected. + */ +class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { + + private val fileSystem = Utils.getHadoopFileSystem("/") + private val allCompressionCodecs = Seq[String]( + "org.apache.spark.io.LZFCompressionCodec", + "org.apache.spark.io.SnappyCompressionCodec" + ) + + test("Parse names of special files") { + testParsingFileName() + } + + test("Verify special files exist") { + testSpecialFilesExist() + testSpecialFilesExist(logDirPath = Some("/tmp/spark-foo")) + allCompressionCodecs.foreach { codec => + testSpecialFilesExist(compressionCodec = Some(codec)) + } + } + + test("Parse event logging info") { + testParsingLogInfo() + testParsingLogInfo(logDirPath = Some("/tmp/spark-foo")) + allCompressionCodecs.foreach { codec => + testParsingLogInfo(compressionCodec = Some(codec)) + } + } + + test("Basic event logging") { + testEventLogging() + testEventLogging(logDirPath = Some("/tmp/spark-foo")) + allCompressionCodecs.foreach { codec => + testEventLogging(compressionCodec = Some(codec)) + } + } + + test("End-to-end event logging") { + testApplicationEventLogging() + testApplicationEventLogging(logDirPath = Some("/tmp/spark-foo")) + allCompressionCodecs.foreach { codec => + testApplicationEventLogging(compressionCodec = Some(codec)) + } + } + + + /** + * Test whether names of special files are correctly identified and parsed. + */ + private def testParsingFileName() { + val logPrefix = EventLoggingListener.LOG_PREFIX + val sparkVersionPrefix = EventLoggingListener.SPARK_VERSION_PREFIX + val compressionCodecPrefix = EventLoggingListener.COMPRESSION_CODEC_PREFIX + val applicationComplete = EventLoggingListener.APPLICATION_COMPLETE + assert(EventLoggingListener.isEventLogFile(logPrefix + "0")) + assert(EventLoggingListener.isEventLogFile(logPrefix + "100")) + assert(EventLoggingListener.isEventLogFile(logPrefix + "ANYTHING")) + assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "0.9.1")) + assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "1.0.0")) + assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "ANYTHING")) + assert(EventLoggingListener.isApplicationCompleteFile(applicationComplete)) + allCompressionCodecs.foreach { codec => + assert(EventLoggingListener.isCompressionCodecFile(compressionCodecPrefix + codec)) + } + + // Negatives + assert(!EventLoggingListener.isEventLogFile("The greatest man of all mankind")) + assert(!EventLoggingListener.isSparkVersionFile("Will never falter in the face of death!")) + assert(!EventLoggingListener.isCompressionCodecFile("Unless he chooses to leave behind")) + assert(!EventLoggingListener.isApplicationCompleteFile("The very treasure he calls Macbeth")) + + // Verify that parsing is correct + assert(EventLoggingListener.parseSparkVersion(sparkVersionPrefix + "1.0.0") === "1.0.0") + allCompressionCodecs.foreach { codec => + assert(EventLoggingListener.parseCompressionCodec(compressionCodecPrefix + codec) === codec) + } + } + + /** + * Test whether the special files produced by EventLoggingListener exist. + * + * There should be exactly one event log and one spark version file throughout the entire + * execution. If a compression codec is specified, then the compression codec file should + * also exist. Only after the application has completed does the test expect the application + * completed file to be present. + */ + private def testSpecialFilesExist( + logDirPath: Option[String] = None, + compressionCodec: Option[String] = None) { + + def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) { + val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0 + val numApplicationCompleteFiles = if (loggerStopped) 1 else 0 + assert(logFiles.size === 2 + numCompressionCodecFiles + numApplicationCompleteFiles) + assert(eventLogsExist(logFiles)) + assert(sparkVersionExists(logFiles)) + assert(compressionCodecExists(logFiles) === compressionCodec.isDefined) + assert(applicationCompleteExists(logFiles) === loggerStopped) + assertSparkVersionIsValid(logFiles) + compressionCodec.foreach { codec => + assertCompressionCodecIsValid(logFiles, codec) + } + } + + // Verify logging directory exists + val conf = getLoggingConf(logDirPath, compressionCodec) + val eventLogger = new EventLoggingListener("test", conf) + eventLogger.start() + val logPath = new Path(eventLogger.logDir) + val logDir = fileSystem.getFileStatus(logPath) + assert(logDir.isDir) + + // Verify special files are as expected before stop() + var logFiles = fileSystem.listStatus(logPath) + assert(logFiles != null) + assertFilesExist(logFiles, loggerStopped = false) + + // Verify special files are as expected after stop() + eventLogger.stop() + logFiles = fileSystem.listStatus(logPath) + assertFilesExist(logFiles, loggerStopped = true) + } + + /** + * Test whether EventLoggingListener correctly parses the correct information from the logs. + * + * This includes whether it returns the correct Spark version, compression codec (if any), + * and the application's completion status. + */ + private def testParsingLogInfo( + logDirPath: Option[String] = None, + compressionCodec: Option[String] = None) { + + def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) { + assert(info.logPaths.size > 0) + assert(info.sparkVersion === SparkContext.SPARK_VERSION) + assert(info.compressionCodec.isDefined === compressionCodec.isDefined) + info.compressionCodec.foreach { codec => + assert(compressionCodec.isDefined) + val expectedCodec = compressionCodec.get.split('.').last + assert(codec.getClass.getSimpleName === expectedCodec) + } + assert(info.applicationComplete === loggerStopped) + } + + // Verify that all information is correctly parsed before stop() + val conf = getLoggingConf(logDirPath, compressionCodec) + val eventLogger = new EventLoggingListener("test", conf) + eventLogger.start() + val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir) + var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assertInfoCorrect(eventLoggingInfo, loggerStopped = false) + + // Verify that all information is correctly parsed after stop() + eventLogger.stop() + eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assertInfoCorrect(eventLoggingInfo, loggerStopped = true) + } + + /** + * Test basic event logging functionality. + * + * This creates two simple events, posts them to the EventLoggingListener, and verifies that + * exactly these two events are logged in the expected file. + */ + private def testEventLogging( + logDirPath: Option[String] = None, + compressionCodec: Option[String] = None) { + val conf = getLoggingConf(logDirPath, compressionCodec) + val eventLogger = new EventLoggingListener("test", conf) + val listenerBus = new LiveListenerBus + val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") + val applicationEnd = SparkListenerApplicationEnd(1000L) + + // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite + eventLogger.start() + listenerBus.start() + listenerBus.addListener(eventLogger) + listenerBus.postToAll(applicationStart) + listenerBus.postToAll(applicationEnd) + + // Verify file contains exactly the two events logged + val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir) + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assert(eventLoggingInfo.logPaths.size > 0) + val fileStream = { + val stream = fileSystem.open(eventLoggingInfo.logPaths.head) + eventLoggingInfo.compressionCodec.map { codec => + codec.compressedInputStream(stream) + }.getOrElse(stream) + } + val lines = Source.fromInputStream(fileStream).getLines().toSeq + assert(lines.size === 2) + assert(lines(0).contains("SparkListenerApplicationStart")) + assert(lines(1).contains("SparkListenerApplicationEnd")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart) + assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd) + eventLogger.stop() + } + + /** + * Test end-to-end event logging functionality in an application. + */ + private def testApplicationEventLogging( + logDirPath: Option[String] = None, + compressionCodec: Option[String] = None) { + + val conf = getLoggingConf(logDirPath, compressionCodec) + val sc = new SparkContext("local", "test", conf) + assert(sc.eventLogger.isDefined) + val eventLogger = sc.eventLogger.get + val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir) + val expectedLogDir = logDirPath.getOrElse(EventLoggingListener.DEFAULT_LOG_DIR) + assert(eventLogger.logDir.startsWith(expectedLogDir)) + + // Assert all specified events are found in the event log + def assertEventExists(events: Seq[String]) { + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + val logPath = eventLoggingInfo.logPaths.head + val fileStream = { + val stream = fileSystem.open(logPath) + eventLoggingInfo.compressionCodec.map { codec => + codec.compressedInputStream(stream) + }.getOrElse(stream) + } + val lines = Source.fromInputStream(fileStream).getLines() + val eventSet = mutable.Set(events: _*) + lines.foreach { line => + eventSet.foreach { event => + if (line.contains(event) && + JsonProtocol.sparkEventFromJson(parse(line)).getClass.getSimpleName == event) { + eventSet.remove(event) + } + } + } + assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) + } + + // SparkListenerEvents are posted in a separate thread + class AssertEventListener extends SparkListener { + var jobStarted = false + var jobEnded = false + var appEnded = false + + override def onJobStart(jobStart: SparkListenerJobStart) { + assertEventExists(Seq[String]( + "SparkListenerApplicationStart", + "SparkListenerBlockManagerAdded", + "SparkListenerEnvironmentUpdate" + )) + jobStarted = true + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd) { + assertEventExists(Seq[String]( + "SparkListenerJobStart", + "SparkListenerJobEnd", + "SparkListenerStageSubmitted", + "SparkListenerStageCompleted", + "SparkListenerTaskStart", + "SparkListenerTaskEnd" + )) + jobEnded = true + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + assertEventExists(Seq[String]("SparkListenerApplicationEnd")) + appEnded = true + } + } + val assertEventListener = new AssertEventListener + sc.addSparkListener(assertEventListener) + + // Trigger callbacks + sc.parallelize(1 to 10000).count() + sc.stop() + assert(assertEventListener.jobStarted, "JobStart callback not invoked!") + assert(assertEventListener.jobEnded, "JobEnd callback not invoked!") + assert(assertEventListener.appEnded, "ApplicationEnd callback not invoked!") + } + + /* Helper methods for validating state of the special files. */ + + private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile) + } + + private def sparkVersionExists(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isSparkVersionFile) + } + + private def compressionCodecExists(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isCompressionCodecFile) + } + + private def applicationCompleteExists(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isApplicationCompleteFile) + } + + private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) { + val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile) + assert(file.isDefined) + assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION) + } + + private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) { + val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isCompressionCodecFile) + assert(file.isDefined) + assert(EventLoggingListener.parseCompressionCodec(file.get) === compressionCodec) + } + + /** Get a SparkConf with event logging enabled. */ + private def getLoggingConf( + logDir: Option[String] = None, + compressionCodec: Option[String] = None) = { + val conf = new SparkConf + conf.set("spark.eventLog.enabled", "true") + logDir.foreach { dir => + conf.set("spark.eventLog.dir", dir) + } + compressionCodec.foreach { codec => + conf.set("spark.eventLog.compress", "true") + conf.set("spark.io.compression.codec", codec) + } + conf + } + +} \ No newline at end of file From 5d38ffe98d9b15ce88518cdb50420fe6c39a04e2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 11 Apr 2014 14:43:49 -0700 Subject: [PATCH 02/11] Clean up EventLoggingListenerSuite + add comments --- .../spark/scheduler/SparkListenerBus.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 146 ++++++++++-------- 2 files changed, 82 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index d6df193d9bcf8..0286aac8769b2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -37,7 +37,7 @@ private[spark] trait SparkListenerBus { * Post an event to all attached listeners. This does nothing if the event is * SparkListenerShutdown. */ - protected def postToAll(event: SparkListenerEvent) { + def postToAll(event: SparkListenerEvent) { event match { case stageSubmitted: SparkListenerStageSubmitted => sparkListeners.foreach(_.onStageSubmitted(stageSubmitted)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 45e9f5b7cb12d..1940d024d6573 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -26,16 +26,15 @@ import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.{JsonProtocol, Utils} -import org.apache.spark.io.CompressionCodec /** * Test for whether EventLoggingListener logs events properly. * - * This checks whether special files are created using the specified configurations, and whether - * logged events can be read back into memory as expected. + * This tests whether EventLoggingListener actually creates special files while logging events, + * whether the parsing of these special files is correct, and whether the logged events can be + * read and deserialized into actual SparkListenerEvents. */ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { - private val fileSystem = Utils.getHadoopFileSystem("/") private val allCompressionCodecs = Seq[String]( "org.apache.spark.io.LZFCompressionCodec", @@ -79,6 +78,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { } + /* ----------------- * + * Actual test logic * + * ----------------- */ + /** * Test whether names of special files are correctly identified and parsed. */ @@ -182,7 +185,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { val conf = getLoggingConf(logDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) eventLogger.start() - val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir) var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) assertInfoCorrect(eventLoggingInfo, loggerStopped = false) @@ -215,7 +217,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { listenerBus.postToAll(applicationEnd) // Verify file contains exactly the two events logged - val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir) val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) assert(eventLoggingInfo.logPaths.size > 0) val fileStream = { @@ -235,86 +236,101 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { /** * Test end-to-end event logging functionality in an application. + * This runs a simple Spark job and asserts that the expected events are logged when expected. */ private def testApplicationEventLogging( logDirPath: Option[String] = None, compressionCodec: Option[String] = None) { - val conf = getLoggingConf(logDirPath, compressionCodec) val sc = new SparkContext("local", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get - val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir) val expectedLogDir = logDirPath.getOrElse(EventLoggingListener.DEFAULT_LOG_DIR) assert(eventLogger.logDir.startsWith(expectedLogDir)) - // Assert all specified events are found in the event log - def assertEventExists(events: Seq[String]) { - val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) - val logPath = eventLoggingInfo.logPaths.head - val fileStream = { - val stream = fileSystem.open(logPath) - eventLoggingInfo.compressionCodec.map { codec => - codec.compressedInputStream(stream) - }.getOrElse(stream) - } - val lines = Source.fromInputStream(fileStream).getLines() - val eventSet = mutable.Set(events: _*) - lines.foreach { line => - eventSet.foreach { event => - if (line.contains(event) && - JsonProtocol.sparkEventFromJson(parse(line)).getClass.getSimpleName == event) { - eventSet.remove(event) - } + // Begin listening for events that trigger asserts + val eventExistenceListener = new EventExistenceListener(eventLogger) + sc.addSparkListener(eventExistenceListener) + + // Trigger asserts for whether the expected events are actually logged + sc.parallelize(1 to 10000).count() + sc.stop() + + // Ensure all asserts have actually been triggered + eventExistenceListener.assertAllCallbacksInvoked() + } + + /** + * Assert that all of the specified events are logged by the given EventLoggingListener. + */ + private def assertEventExists(eventLogger: EventLoggingListener, events: Seq[String]) { + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + val logPath = eventLoggingInfo.logPaths.head + val fileStream = { + val stream = fileSystem.open(logPath) + eventLoggingInfo.compressionCodec.map { codec => + codec.compressedInputStream(stream) + }.getOrElse(stream) + } + val lines = Source.fromInputStream(fileStream).getLines() + val eventSet = mutable.Set(events: _*) + lines.foreach { line => + eventSet.foreach { event => + if (line.contains(event) && + JsonProtocol.sparkEventFromJson(parse(line)).getClass.getSimpleName == event) { + eventSet.remove(event) } } - assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) } + assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) + } - // SparkListenerEvents are posted in a separate thread - class AssertEventListener extends SparkListener { - var jobStarted = false - var jobEnded = false - var appEnded = false - - override def onJobStart(jobStart: SparkListenerJobStart) { - assertEventExists(Seq[String]( - "SparkListenerApplicationStart", - "SparkListenerBlockManagerAdded", - "SparkListenerEnvironmentUpdate" - )) - jobStarted = true - } + /** + * A listener that asserts certain events are logged by the given EventLoggingListener. + * This is necessary because events are posted asynchronously in a different thread. + */ + private class EventExistenceListener(eventLogger: EventLoggingListener) extends SparkListener { + var jobStarted = false + var jobEnded = false + var appEnded = false + + override def onJobStart(jobStart: SparkListenerJobStart) { + assertEventExists(eventLogger, Seq[String]( + "SparkListenerApplicationStart", + "SparkListenerBlockManagerAdded", + "SparkListenerEnvironmentUpdate" + )) + jobStarted = true + } - override def onJobEnd(jobEnd: SparkListenerJobEnd) { - assertEventExists(Seq[String]( - "SparkListenerJobStart", - "SparkListenerJobEnd", - "SparkListenerStageSubmitted", - "SparkListenerStageCompleted", - "SparkListenerTaskStart", - "SparkListenerTaskEnd" - )) - jobEnded = true - } + override def onJobEnd(jobEnd: SparkListenerJobEnd) { + assertEventExists(eventLogger, Seq[String]( + "SparkListenerJobStart", + "SparkListenerJobEnd", + "SparkListenerStageSubmitted", + "SparkListenerStageCompleted", + "SparkListenerTaskStart", + "SparkListenerTaskEnd" + )) + jobEnded = true + } - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - assertEventExists(Seq[String]("SparkListenerApplicationEnd")) - appEnded = true - } + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + assertEventExists(eventLogger, Seq[String]("SparkListenerApplicationEnd")) + appEnded = true } - val assertEventListener = new AssertEventListener - sc.addSparkListener(assertEventListener) - // Trigger callbacks - sc.parallelize(1 to 10000).count() - sc.stop() - assert(assertEventListener.jobStarted, "JobStart callback not invoked!") - assert(assertEventListener.jobEnded, "JobEnd callback not invoked!") - assert(assertEventListener.appEnded, "ApplicationEnd callback not invoked!") + def assertAllCallbacksInvoked() { + assert(jobStarted, "JobStart callback not invoked!") + assert(jobEnded, "JobEnd callback not invoked!") + assert(appEnded, "ApplicationEnd callback not invoked!") + } } - /* Helper methods for validating state of the special files. */ + + /* -------------------------------------------------------- * + * Helper methods for validating state of the special files * + * -------------------------------------------------------- */ private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = { logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile) From 187bb2587b6df07a77a72d4be19e1ed0d12489ff Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 28 Apr 2014 10:34:37 -0700 Subject: [PATCH 03/11] Formatting and renaming variables --- .../scheduler/EventLoggingListener.scala | 22 +++++++++++-------- .../org/apache/spark/util/FileLogger.scala | 12 +++++----- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 6be5af0d67d3a..c17a355cbfb7f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{FileLogger, JsonProtocol} @@ -39,22 +40,25 @@ import org.apache.spark.util.{FileLogger, JsonProtocol} */ private[spark] class EventLoggingListener( appName: String, - conf: SparkConf, - hadoopConfiguration: Configuration) + sparkConf: SparkConf, + hadoopConf: Configuration) extends SparkListener with Logging { import EventLoggingListener._ - private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false) - private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false) - private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024 - private val logBaseDir = conf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") + def this(appName: String, sparkConf: SparkConf) = { + this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration()) + } + + private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) + private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) + private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 + private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis val logDir = logBaseDir + "/" + name private val logger = - new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress, - shouldOverwrite) + new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite) /** * Begin logging events. @@ -63,7 +67,7 @@ private[spark] class EventLoggingListener( def start() { logInfo("Logging events to %s".format(logDir)) if (shouldCompress) { - val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) + val codec = sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) logger.newFile(COMPRESSION_CODEC_PREFIX + codec) } logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION) diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 1ed3b70bb24fd..9242ff824b026 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException} +import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter} import java.net.URI import java.text.SimpleDateFormat import java.util.Date @@ -38,8 +38,8 @@ import org.apache.spark.io.CompressionCodec */ private[spark] class FileLogger( logDir: String, - conf: SparkConf, - hadoopConfiguration: Configuration, + sparkConf: SparkConf, + hadoopConf: Configuration, outputBufferSize: Int = 8 * 1024, // 8 KB compress: Boolean = false, overwrite: Boolean = true) @@ -53,7 +53,7 @@ private[spark] class FileLogger( var fileIndex = 0 // Only used if compression is enabled - private lazy val compressionCodec = CompressionCodec.createCodec(conf) + private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf) // Only defined if the file system scheme is not local private var hadoopDataStream: Option[FSDataOutputStream] = None @@ -87,8 +87,8 @@ private[spark] class FileLogger( private def createWriter(fileName: String): PrintWriter = { val logPath = logDir + "/" + fileName val uri = new URI(logPath) - val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme - val isDefaultLocal = (defaultFs == null || defaultFs == "file") + val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme + val isDefaultLocal = defaultFs == null || defaultFs == "file" /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). * Therefore, for local files, use FileOutputStream instead. */ From ab66a846b23bcc16377afd35ec9b897f921b4626 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 28 Apr 2014 20:11:47 -0700 Subject: [PATCH 04/11] Tests for FileLogger + delete file after tests --- .../scheduler/EventLoggingListener.scala | 9 +- .../org/apache/spark/util/FileLogger.scala | 5 +- .../scheduler/EventLoggingListenerSuite.scala | 8 +- .../apache/spark/util/FileLoggerSuite.scala | 153 ++++++++++++++++++ 4 files changed, 166 insertions(+), 9 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index c17a355cbfb7f..e7780c6ed3f66 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -41,15 +41,11 @@ import org.apache.spark.util.{FileLogger, JsonProtocol} private[spark] class EventLoggingListener( appName: String, sparkConf: SparkConf, - hadoopConf: Configuration) + hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration()) extends SparkListener with Logging { import EventLoggingListener._ - def this(appName: String, sparkConf: SparkConf) = { - this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration()) - } - private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 @@ -67,7 +63,8 @@ private[spark] class EventLoggingListener( def start() { logInfo("Logging events to %s".format(logDir)) if (shouldCompress) { - val codec = sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) + val codec = + sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) logger.newFile(COMPRESSION_CODEC_PREFIX + codec) } logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION) diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 9242ff824b026..3725d3a2ee848 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec /** @@ -39,7 +40,7 @@ import org.apache.spark.io.CompressionCodec private[spark] class FileLogger( logDir: String, sparkConf: SparkConf, - hadoopConf: Configuration, + hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(), outputBufferSize: Int = 8 * 1024, // 8 KB compress: Boolean = false, overwrite: Boolean = true) @@ -116,7 +117,7 @@ private[spark] class FileLogger( val writeInfo = if (!withTime) { msg } else { - val date = new Date(System.currentTimeMillis()) + val date = new Date(System.currentTimeMillis) dateFormat.get.format(date) + ": " + msg } writer.foreach(_.print(writeInfo)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 1940d024d6573..ea3582415027c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import scala.collection.mutable import scala.io.Source +import scala.util.Try import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.jackson.JsonMethods._ @@ -41,6 +42,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { "org.apache.spark.io.SnappyCompressionCodec" ) + after { + Try { fileSystem.delete(new Path("/tmp/spark-events"), true) } + Try { fileSystem.delete(new Path("/tmp/spark-foo"), true) } + } + test("Parse names of special files") { testParsingFileName() } @@ -376,4 +382,4 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { conf } -} \ No newline at end of file +} diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala new file mode 100644 index 0000000000000..60f02f54dce4d --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.io.Source +import scala.util.Try + +import org.apache.hadoop.fs.Path +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.io.CompressionCodec + +/** + * Tests for writing files through the FileLogger. + */ +class FileLoggerSuite extends FunSuite with BeforeAndAfter { + private val fileSystem = Utils.getHadoopFileSystem("/") + private val allCompressionCodecs = Seq[String]( + "org.apache.spark.io.LZFCompressionCodec", + "org.apache.spark.io.SnappyCompressionCodec" + ) + private val logDir = "/tmp/test-file-logger" + private val logDirPath = new Path(logDir) + + after { + Try { fileSystem.delete(logDirPath, true) } + Try { fileSystem.delete(new Path("falafel"), true) } + } + + test("simple write") { + testSingleFile() + } + + test ("simple write with compression") { + allCompressionCodecs.foreach { codec => + testSingleFile(Some(codec)) + } + } + + test("multiple files") { + testMultipleFiles() + } + + test("multiple files with compression") { + allCompressionCodecs.foreach { codec => + testMultipleFiles(Some(codec)) + } + } + + test("logging when directory already exists") { + // Create the logging directory multiple times + new FileLogger(logDir, new SparkConf, overwrite = true) + new FileLogger(logDir, new SparkConf, overwrite = true) + new FileLogger(logDir, new SparkConf, overwrite = true) + + // If overwrite is not enabled, an exception should be thrown + intercept[IOException] { new FileLogger(logDir, new SparkConf, overwrite = false) } + } + + + /* ----------------- * + * Actual test logic * + * ----------------- */ + + /** + * Test logging to a single file. + */ + private def testSingleFile(codecName: Option[String] = None) { + val conf = getLoggingConf(codecName) + val codec = codecName.map { c => CompressionCodec.createCodec(conf) } + val logger = + if (codecName.isDefined) { + new FileLogger(logDir, conf, compress = true) + } else { + new FileLogger(logDir, conf) + } + assert(fileSystem.exists(logDirPath)) + assert(fileSystem.getFileStatus(logDirPath).isDir) + assert(fileSystem.listStatus(logDirPath).size === 0) + + logger.newFile() + val files = fileSystem.listStatus(logDirPath) + assert(files.size === 1) + val firstFile = files.head + val firstFilePath = firstFile.getPath + + logger.log("hello") + logger.flush() + assert(readFileContent(firstFilePath, codec) === "hello") + + logger.log(" world") + logger.close() + assert(readFileContent(firstFilePath, codec) === "hello world") + } + + /** + * Test logging to multiple files. + */ + private def testMultipleFiles(codecName: Option[String] = None) { + val conf = getLoggingConf(codecName) + val codec = codecName.map { c => CompressionCodec.createCodec(conf) } + val logger = + if (codecName.isDefined) { + new FileLogger(logDir, conf, compress = true) + } else { + new FileLogger(logDir, conf) + } + assert(fileSystem.exists(logDirPath)) + assert(fileSystem.getFileStatus(logDirPath).isDir) + assert(fileSystem.listStatus(logDirPath).size === 0) + + logger.newFile("Jean_Valjean") + logger.logLine("Who am I?") + logger.logLine("Destiny?") + logger.newFile("John_Valjohn") + logger.logLine("One") + logger.logLine("Two three four...") + logger.close() + assert(readFileContent(new Path(logDir + "/Jean_Valjean"), codec) === "Who am I?\nDestiny?") + assert(readFileContent(new Path(logDir + "/John_Valjohn"), codec) === "One\nTwo three four...") + } + + private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = { + val fstream = fileSystem.open(logPath) + val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream) + Source.fromInputStream(cstream).getLines().mkString("\n") + } + + private def getLoggingConf(codecName: Option[String]) = { + val conf = new SparkConf + codecName.foreach { c => conf.set("spark.io.compression.codec", c) } + conf + } + +} From b9904537ca19e3e18504a1f448cc77958bbbb909 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 28 Apr 2014 22:16:41 -0700 Subject: [PATCH 05/11] ReplayListenerBus suite - tests do not all pass yet --- .../org/apache/spark/util/JsonProtocol.scala | 10 +- .../scheduler/EventLoggingListenerSuite.scala | 11 +- .../spark/scheduler/ReplayListenerSuite.scala | 169 ++++++++++++++++++ .../apache/spark/util/FileLoggerSuite.scala | 10 +- 4 files changed, 188 insertions(+), 12 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 9aed3e0985654..09825087bb048 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -646,11 +646,11 @@ private[spark] object JsonProtocol { } def propertiesFromJson(json: JValue): Properties = { - val properties = new Properties() - if (json != JNothing) { - mapFromJson(json).map { case (k, v) => properties.setProperty(k, v) } - } - properties + Utils.jsonOption(json).map { value => + val properties = new Properties + mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) } + properties + }.getOrElse(null) } def UUIDFromJson(json: JValue): UUID = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index ea3582415027c..f202d94bbf7c7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -36,6 +36,9 @@ import org.apache.spark.util.{JsonProtocol, Utils} * read and deserialized into actual SparkListenerEvents. */ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { + + import EventLoggingListenerSuite._ + private val fileSystem = Utils.getHadoopFileSystem("/") private val allCompressionCodecs = Seq[String]( "org.apache.spark.io.LZFCompressionCodec", @@ -366,8 +369,13 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { assert(EventLoggingListener.parseCompressionCodec(file.get) === compressionCodec) } +} + + +object EventLoggingListenerSuite { + /** Get a SparkConf with event logging enabled. */ - private def getLoggingConf( + def getLoggingConf( logDir: Option[String] = None, compressionCodec: Option[String] = None) = { val conf = new SparkConf @@ -381,5 +389,4 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { } conf } - } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala new file mode 100644 index 0000000000000..9ea0d5254178a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.PrintWriter + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.fs.Path +import org.json4s.jackson.JsonMethods._ +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.io.CompressionCodec + +/** + * Test for whether ReplayListenerBus replays events from logs correctly. + */ +class ReplayListenerSuite extends FunSuite with BeforeAndAfter { + private val fileSystem = Utils.getHadoopFileSystem("/") + private val allCompressionCodecs = Seq[String]( + "org.apache.spark.io.LZFCompressionCodec", + "org.apache.spark.io.SnappyCompressionCodec" + ) + + after { + + } + + test("Simple replay") { + testSimpleReplay() + } + + test("Simple replay with compression") { + allCompressionCodecs.foreach { codec => + testSimpleReplay(Some(codec)) + } + } + + test("End-to-end replay") { + testApplicationReplay() + } + + + /* ----------------- * + * Actual test logic * + * ----------------- */ + + /** + * Test simple replaying of events. + */ + private def testSimpleReplay(codecName: Option[String] = None) { + val logFilePath = new Path("/tmp/events.txt") + val codec = codecName.map(getCompressionCodec) + val fstream = fileSystem.create(logFilePath) + val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) + val writer = new PrintWriter(cstream) + val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") + val applicationEnd = SparkListenerApplicationEnd(1000L) + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) + writer.close() + val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) + val eventKeeper = new EventKeeper + replayer.addListener(eventKeeper) + replayer.replay() + assert(eventKeeper.events.size === 2) + assert(eventKeeper.events(0) === applicationStart) + assert(eventKeeper.events(1) === applicationEnd) + } + + /** + * + */ + private def testApplicationReplay(codecName: Option[String] = None) { + val logDir = "/tmp/test-replay" + val logDirPath = new Path(logDir) + val conf = EventLoggingListenerSuite.getLoggingConf(Some(logDir), codecName) + val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf) + val eventKeeper = new EventKeeper + sc.addSparkListener(eventKeeper) + + // Run a job + sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().cache().count() + sc.stop() + + // Find the log file + val applications = fileSystem.listStatus(logDirPath) + assert(applications != null && applications.size > 0) + val eventLogDir = + applications.filter(_.getPath.getName.startsWith("test-replay")).sortBy(_.getAccessTime).last + assert(eventLogDir.isDir) + val logFiles = fileSystem.listStatus(eventLogDir.getPath) + assert(logFiles != null && logFiles.size > 0) + val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_")) + assert(logFile.isDefined) + val codec = codecName.map(getCompressionCodec) + + // Replay events + val replayer = new ReplayListenerBus(Seq(logFile.get.getPath), fileSystem, codec) + val replayEventKeeper = new EventKeeper + replayer.addListener(replayEventKeeper) + replayer.replay() + + // Verify the same events are replayed in the same order + val filteredEvents = filterSchedulerEvents(eventKeeper.events) + val filteredReplayEvents = filterSchedulerEvents(replayEventKeeper.events) + assert(filteredEvents.size === filteredReplayEvents.size) + filteredEvents.zip(filteredReplayEvents).foreach { case (e1, e2) => + assert(JsonProtocol.sparkEventToJson(e1) === JsonProtocol.sparkEventToJson(e2)) + } + } + + /** + * A simple listener that keeps all events it receives + */ + private class EventKeeper extends SparkListener { + val events = new ArrayBuffer[SparkListenerEvent] + override def onStageSubmitted(e: SparkListenerStageSubmitted) { events += e } + override def onStageCompleted(e: SparkListenerStageCompleted) { events += e } + override def onTaskStart(e: SparkListenerTaskStart) { events += e } + override def onTaskGettingResult(e: SparkListenerTaskGettingResult) { events += e } + override def onTaskEnd(e: SparkListenerTaskEnd) { events += e } + override def onJobStart(e: SparkListenerJobStart) { events += e } + override def onJobEnd(e: SparkListenerJobEnd) { events += e } + override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) { events += e } + override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) = { events += e } + override def onBlockManagerRemoved(e: SparkListenerBlockManagerRemoved) = { events += e } + override def onUnpersistRDD(e: SparkListenerUnpersistRDD) { events += e } + override def onApplicationStart(e: SparkListenerApplicationStart) { events += e } + override def onApplicationEnd(e: SparkListenerApplicationEnd) { events += e } + } + + private def filterSchedulerEvents(events: Seq[SparkListenerEvent]): Seq[SparkListenerEvent] = { + events.collect { + case e: SparkListenerStageSubmitted => e + case e: SparkListenerStageCompleted => e + case e: SparkListenerTaskStart => e + case e: SparkListenerTaskGettingResult => e + case e: SparkListenerTaskEnd => e + case e: SparkListenerJobStart => e + case e: SparkListenerJobEnd => e + } + } + + private def getCompressionCodec(codecName: String) = { + val conf = new SparkConf + conf.set("spark.io.compression.codec", codecName) + CompressionCodec.createCodec(conf) + } + +} diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala index 60f02f54dce4d..4fef6583fe6a5 100644 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -45,27 +45,27 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { Try { fileSystem.delete(new Path("falafel"), true) } } - test("simple write") { + test("Simple logging") { testSingleFile() } - test ("simple write with compression") { + test ("Simple logging with compression") { allCompressionCodecs.foreach { codec => testSingleFile(Some(codec)) } } - test("multiple files") { + test("Logging multiple files") { testMultipleFiles() } - test("multiple files with compression") { + test("Logging multiple files with compression") { allCompressionCodecs.foreach { codec => testMultipleFiles(Some(codec)) } } - test("logging when directory already exists") { + test("Logging when directory already exists") { // Create the logging directory multiple times new FileLogger(logDir, new SparkConf, overwrite = true) new FileLogger(logDir, new SparkConf, overwrite = true) From e0ba2f8e8c37adc4c57684b2a42aa20fc2e19b19 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 29 Apr 2014 13:25:04 -0700 Subject: [PATCH 06/11] Fix test failures caused by race condition in processing/mutating events An event can be mutated by the DAGScheduler in between being procssed by one listener and being processed by another. This causes the ReplayListenerSuite to be flaky. This commit ensures that the event logged is the same as the original event received by the EventLoggingListener. --- .../scheduler/EventLoggingListener.scala | 15 ++- .../scheduler/EventLoggingListenerSuite.scala | 1 + .../spark/scheduler/ReplayListenerSuite.scala | 100 +++++++++--------- 3 files changed, 61 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index e7780c6ed3f66..4b5bdca34ef56 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -18,9 +18,11 @@ package org.apache.spark.scheduler import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SparkContext} @@ -48,14 +50,18 @@ private[spark] class EventLoggingListener( private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) + private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis val logDir = logBaseDir + "/" + name - private val logger = + protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite) + // For testing. Keep track of all JSON serialized events that have been logged. + private[scheduler] val loggedEvents = new ArrayBuffer[JValue] + /** * Begin logging events. * If compression is used, log a file that indicates which compression library is used. @@ -73,11 +79,14 @@ private[spark] class EventLoggingListener( /** Log the event as JSON. */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { - val eventJson = compact(render(JsonProtocol.sparkEventToJson(event))) - logger.logLine(eventJson) + val eventJson = JsonProtocol.sparkEventToJson(event) + logger.logLine(compact(render(eventJson))) if (flushLogger) { logger.flush() } + if (testing) { + loggedEvents += eventJson + } } // Events that do not trigger a flush diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index f202d94bbf7c7..a0af4becd721b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -380,6 +380,7 @@ object EventLoggingListenerSuite { compressionCodec: Option[String] = None) = { val conf = new SparkConf conf.set("spark.eventLog.enabled", "true") + conf.set("spark.eventLog.testing", "true") logDir.foreach { dir => conf.set("spark.eventLog.dir", dir) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 9ea0d5254178a..7c61210469a8f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.io.PrintWriter -import scala.collection.mutable.ArrayBuffer +import scala.util.Try import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ @@ -27,8 +27,8 @@ import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.SparkContext._ import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.util.{JsonProtocol, Utils} import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.{JsonProtocol, Utils} /** * Test for whether ReplayListenerBus replays events from logs correctly. @@ -41,7 +41,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { ) after { - + Try { fileSystem.delete(new Path("/tmp/events.txt"), true) } + Try { fileSystem.delete(new Path("/tmp/test-replay"), true) } } test("Simple replay") { @@ -54,10 +55,18 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { } } + // This assumes the correctness of EventLoggingListener test("End-to-end replay") { testApplicationReplay() } + // This assumes the correctness of EventLoggingListener + test("End-to-end replay with compression") { + allCompressionCodecs.foreach { codec => + testApplicationReplay(Some(codec)) + } + } + /* ----------------- * * Actual test logic * @@ -78,31 +87,37 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) writer.close() val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) - val eventKeeper = new EventKeeper - replayer.addListener(eventKeeper) + val conf = EventLoggingListenerSuite.getLoggingConf(compressionCodec = codecName) + val eventMonster = new EventMonster(conf) + replayer.addListener(eventMonster) replayer.replay() - assert(eventKeeper.events.size === 2) - assert(eventKeeper.events(0) === applicationStart) - assert(eventKeeper.events(1) === applicationEnd) + assert(eventMonster.loggedEvents.size === 2) + assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart)) + assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd)) } /** + * Test end-to-end replaying of events. * + * This test runs a few simple jobs with event logging enabled, and compares each emitted + * event to the corresponding event replayed from the event logs. This test makes the + * assumption that the event logging behavior is correct (tested in a separate suite). */ private def testApplicationReplay(codecName: Option[String] = None) { val logDir = "/tmp/test-replay" - val logDirPath = new Path(logDir) val conf = EventLoggingListenerSuite.getLoggingConf(Some(logDir), codecName) val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf) - val eventKeeper = new EventKeeper - sc.addSparkListener(eventKeeper) - // Run a job - sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().cache().count() + // Run a few jobs + sc.parallelize(1 to 100, 1).count() + sc.parallelize(1 to 100, 2).map(i => (i, i)).count() + sc.parallelize(1 to 100, 3).map(i => (i, i)).groupByKey().count() + sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().persist().count() sc.stop() - // Find the log file - val applications = fileSystem.listStatus(logDirPath) + // Prepare information needed for replay + val codec = codecName.map(getCompressionCodec) + val applications = fileSystem.listStatus(new Path(logDir)) assert(applications != null && applications.size > 0) val eventLogDir = applications.filter(_.getPath.getName.startsWith("test-replay")).sortBy(_.getAccessTime).last @@ -111,53 +126,34 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { assert(logFiles != null && logFiles.size > 0) val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_")) assert(logFile.isDefined) - val codec = codecName.map(getCompressionCodec) // Replay events val replayer = new ReplayListenerBus(Seq(logFile.get.getPath), fileSystem, codec) - val replayEventKeeper = new EventKeeper - replayer.addListener(replayEventKeeper) + val eventMonster = new EventMonster(conf) + replayer.addListener(eventMonster) replayer.replay() // Verify the same events are replayed in the same order - val filteredEvents = filterSchedulerEvents(eventKeeper.events) - val filteredReplayEvents = filterSchedulerEvents(replayEventKeeper.events) - assert(filteredEvents.size === filteredReplayEvents.size) - filteredEvents.zip(filteredReplayEvents).foreach { case (e1, e2) => - assert(JsonProtocol.sparkEventToJson(e1) === JsonProtocol.sparkEventToJson(e2)) - } + assert(sc.eventLogger.isDefined) + val originalEvents = sc.eventLogger.get.loggedEvents + val replayedEvents = eventMonster.loggedEvents + originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) } } /** - * A simple listener that keeps all events it receives + * A simple listener that buffers all the events it receives. + * + * The event buffering functionality must be implemented within EventLoggingListener itself. + * This is because of the following race condition: the event may be mutated between being + * processed by one listener and being processed by another. Thus, in order to establish + * a fair comparison between the original events and the replayed events, both functionalities + * must be implemented within one listener (i.e. the EventLoggingListener). + * + * This child listener inherits only the event buffering functionality, but does not actually + * log the events. */ - private class EventKeeper extends SparkListener { - val events = new ArrayBuffer[SparkListenerEvent] - override def onStageSubmitted(e: SparkListenerStageSubmitted) { events += e } - override def onStageCompleted(e: SparkListenerStageCompleted) { events += e } - override def onTaskStart(e: SparkListenerTaskStart) { events += e } - override def onTaskGettingResult(e: SparkListenerTaskGettingResult) { events += e } - override def onTaskEnd(e: SparkListenerTaskEnd) { events += e } - override def onJobStart(e: SparkListenerJobStart) { events += e } - override def onJobEnd(e: SparkListenerJobEnd) { events += e } - override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) { events += e } - override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) = { events += e } - override def onBlockManagerRemoved(e: SparkListenerBlockManagerRemoved) = { events += e } - override def onUnpersistRDD(e: SparkListenerUnpersistRDD) { events += e } - override def onApplicationStart(e: SparkListenerApplicationStart) { events += e } - override def onApplicationEnd(e: SparkListenerApplicationEnd) { events += e } - } - - private def filterSchedulerEvents(events: Seq[SparkListenerEvent]): Seq[SparkListenerEvent] = { - events.collect { - case e: SparkListenerStageSubmitted => e - case e: SparkListenerStageCompleted => e - case e: SparkListenerTaskStart => e - case e: SparkListenerTaskGettingResult => e - case e: SparkListenerTaskEnd => e - case e: SparkListenerJobStart => e - case e: SparkListenerJobEnd => e - } + private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) { + logger.close() } private def getCompressionCodec(codecName: String) = { From ad2beff5226b4ff93d36252cfceca883cfb789fa Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 29 Apr 2014 14:17:45 -0700 Subject: [PATCH 07/11] Clean up EventLoggingListenerSuite + modify a few comments --- .../scheduler/EventLoggingListenerSuite.scala | 90 +++++++++++-------- .../spark/scheduler/ReplayListenerSuite.scala | 2 +- .../apache/spark/util/FileLoggerSuite.scala | 2 +- 3 files changed, 56 insertions(+), 38 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index a0af4becd721b..0ffe7768fff62 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -26,19 +26,17 @@ import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} /** - * Test for whether EventLoggingListener logs events properly. + * Test whether EventLoggingListener logs events properly. * * This tests whether EventLoggingListener actually creates special files while logging events, * whether the parsing of these special files is correct, and whether the logged events can be * read and deserialized into actual SparkListenerEvents. */ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { - - import EventLoggingListenerSuite._ - private val fileSystem = Utils.getHadoopFileSystem("/") private val allCompressionCodecs = Seq[String]( "org.apache.spark.io.LZFCompressionCodec", @@ -57,6 +55,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("Verify special files exist") { testSpecialFilesExist() testSpecialFilesExist(logDirPath = Some("/tmp/spark-foo")) + } + + test("Verify special files exist with compression") { allCompressionCodecs.foreach { codec => testSpecialFilesExist(compressionCodec = Some(codec)) } @@ -65,6 +66,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("Parse event logging info") { testParsingLogInfo() testParsingLogInfo(logDirPath = Some("/tmp/spark-foo")) + } + + test("Parse event logging info with compression") { allCompressionCodecs.foreach { codec => testParsingLogInfo(compressionCodec = Some(codec)) } @@ -73,6 +77,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("Basic event logging") { testEventLogging() testEventLogging(logDirPath = Some("/tmp/spark-foo")) + } + + test("Basic event logging with compression") { allCompressionCodecs.foreach { codec => testEventLogging(compressionCodec = Some(codec)) } @@ -81,6 +88,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("End-to-end event logging") { testApplicationEventLogging() testApplicationEventLogging(logDirPath = Some("/tmp/spark-foo")) + } + + test("End-to-end event logging with compression") { allCompressionCodecs.foreach { codec => testApplicationEventLogging(compressionCodec = Some(codec)) } @@ -91,6 +101,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * Actual test logic * * ----------------- */ + import EventLoggingListenerSuite._ + /** * Test whether names of special files are correctly identified and parsed. */ @@ -152,10 +164,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { // Verify logging directory exists val conf = getLoggingConf(logDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) - eventLogger.start() val logPath = new Path(eventLogger.logDir) val logDir = fileSystem.getFileStatus(logPath) assert(logDir.isDir) + eventLogger.start() // Verify special files are as expected before stop() var logFiles = fileSystem.listStatus(logPath) @@ -228,13 +240,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { // Verify file contains exactly the two events logged val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) assert(eventLoggingInfo.logPaths.size > 0) - val fileStream = { - val stream = fileSystem.open(eventLoggingInfo.logPaths.head) - eventLoggingInfo.compressionCodec.map { codec => - codec.compressedInputStream(stream) - }.getOrElse(stream) - } - val lines = Source.fromInputStream(fileStream).getLines().toSeq + val lines = getLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) assert(lines.size === 2) assert(lines(0).contains("SparkListenerApplicationStart")) assert(lines(1).contains("SparkListenerApplicationEnd")) @@ -272,28 +278,38 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { /** * Assert that all of the specified events are logged by the given EventLoggingListener. */ - private def assertEventExists(eventLogger: EventLoggingListener, events: Seq[String]) { + private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) { val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) - val logPath = eventLoggingInfo.logPaths.head - val fileStream = { - val stream = fileSystem.open(logPath) - eventLoggingInfo.compressionCodec.map { codec => - codec.compressedInputStream(stream) - }.getOrElse(stream) - } - val lines = Source.fromInputStream(fileStream).getLines() + assert(eventLoggingInfo.logPaths.size > 0) + val lines = getLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) val eventSet = mutable.Set(events: _*) lines.foreach { line => eventSet.foreach { event => - if (line.contains(event) && - JsonProtocol.sparkEventFromJson(parse(line)).getClass.getSimpleName == event) { - eventSet.remove(event) + if (line.contains(event)) { + val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line)) + val eventType = Utils.getFormattedClassName(parsedEvent) + if (eventType == event) { + eventSet.remove(event) + } } } } assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) } + /** + * Read all lines from the file specified by the given path. + * If a compression codec is specified, use it to read the file. + */ + private def getLines(filePath: Path, compressionCodec: Option[CompressionCodec]): Seq[String] = { + val fstream = fileSystem.open(filePath) + val cstream = + compressionCodec.map { codec => + codec.compressedInputStream(fstream) + }.getOrElse(fstream) + Source.fromInputStream(cstream).getLines().toSeq + } + /** * A listener that asserts certain events are logged by the given EventLoggingListener. * This is necessary because events are posted asynchronously in a different thread. @@ -304,28 +320,30 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { var appEnded = false override def onJobStart(jobStart: SparkListenerJobStart) { - assertEventExists(eventLogger, Seq[String]( - "SparkListenerApplicationStart", - "SparkListenerBlockManagerAdded", - "SparkListenerEnvironmentUpdate" + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerApplicationStart), + Utils.getFormattedClassName(SparkListenerBlockManagerAdded), + Utils.getFormattedClassName(SparkListenerEnvironmentUpdate) )) jobStarted = true } override def onJobEnd(jobEnd: SparkListenerJobEnd) { - assertEventExists(eventLogger, Seq[String]( - "SparkListenerJobStart", - "SparkListenerJobEnd", - "SparkListenerStageSubmitted", - "SparkListenerStageCompleted", - "SparkListenerTaskStart", - "SparkListenerTaskEnd" + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerJobStart), + Utils.getFormattedClassName(SparkListenerJobEnd), + Utils.getFormattedClassName(SparkListenerStageSubmitted), + Utils.getFormattedClassName(SparkListenerStageCompleted), + Utils.getFormattedClassName(SparkListenerTaskStart), + Utils.getFormattedClassName(SparkListenerTaskEnd) )) jobEnded = true } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - assertEventExists(eventLogger, Seq[String]("SparkListenerApplicationEnd")) + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerApplicationEnd) + )) appEnded = true } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 7c61210469a8f..d1a0efe867231 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} /** - * Test for whether ReplayListenerBus replays events from logs correctly. + * Test whether ReplayListenerBus replays events from logs correctly. */ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { private val fileSystem = Utils.getHadoopFileSystem("/") diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala index 4fef6583fe6a5..f477b245d8b37 100644 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.SparkConf import org.apache.spark.io.CompressionCodec /** - * Tests for writing files through the FileLogger. + * Test writing files through the FileLogger. */ class FileLoggerSuite extends FunSuite with BeforeAndAfter { private val fileSystem = Utils.getHadoopFileSystem("/") From 62010fd3a5f9ca08eedcf6fb41f37d6094b89295 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 29 Apr 2014 14:25:11 -0700 Subject: [PATCH 08/11] More cleanup (renaming variables, updating comments etc) --- .../spark/scheduler/EventLoggingListenerSuite.scala | 8 +++++--- .../org/apache/spark/scheduler/ReplayListenerSuite.scala | 6 +++--- .../scala/org/apache/spark/util/FileLoggerSuite.scala | 3 --- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 0ffe7768fff62..d3d8eef514f20 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -240,7 +240,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { // Verify file contains exactly the two events logged val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) assert(eventLoggingInfo.logPaths.size > 0) - val lines = getLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) + val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) assert(lines.size === 2) assert(lines(0).contains("SparkListenerApplicationStart")) assert(lines(1).contains("SparkListenerApplicationEnd")) @@ -281,7 +281,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) { val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) assert(eventLoggingInfo.logPaths.size > 0) - val lines = getLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) + val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) val eventSet = mutable.Set(events: _*) lines.foreach { line => eventSet.foreach { event => @@ -301,7 +301,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * Read all lines from the file specified by the given path. * If a compression codec is specified, use it to read the file. */ - private def getLines(filePath: Path, compressionCodec: Option[CompressionCodec]): Seq[String] = { + private def readFileLines( + filePath: Path, + compressionCodec: Option[CompressionCodec]): Seq[String] = { val fstream = fileSystem.open(filePath) val cstream = compressionCodec.map { codec => diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d1a0efe867231..4f04893ef8239 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -119,16 +119,16 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { val codec = codecName.map(getCompressionCodec) val applications = fileSystem.listStatus(new Path(logDir)) assert(applications != null && applications.size > 0) - val eventLogDir = - applications.filter(_.getPath.getName.startsWith("test-replay")).sortBy(_.getAccessTime).last + val eventLogDir = applications.sortBy(_.getAccessTime).last assert(eventLogDir.isDir) val logFiles = fileSystem.listStatus(eventLogDir.getPath) assert(logFiles != null && logFiles.size > 0) val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_")) assert(logFile.isDefined) + val logFilePath = logFile.get.getPath // Replay events - val replayer = new ReplayListenerBus(Seq(logFile.get.getPath), fileSystem, codec) + val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) val eventMonster = new EventMonster(conf) replayer.addListener(eventMonster) replayer.replay() diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala index f477b245d8b37..f50e3103ffbd4 100644 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -123,9 +123,6 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { } else { new FileLogger(logDir, conf) } - assert(fileSystem.exists(logDirPath)) - assert(fileSystem.getFileStatus(logDirPath).isDir) - assert(fileSystem.listStatus(logDirPath).size === 0) logger.newFile("Jean_Valjean") logger.logLine("Who am I?") From 2b521517adc8170855b9b0bd61d2b042f513c2eb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 29 Apr 2014 20:12:13 -0700 Subject: [PATCH 09/11] Remove unnecessary file delete + add a comment --- .../test/scala/org/apache/spark/util/FileLoggerSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala index f50e3103ffbd4..9b16c71284ad1 100644 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -42,7 +42,6 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { after { Try { fileSystem.delete(logDirPath, true) } - Try { fileSystem.delete(new Path("falafel"), true) } } test("Simple logging") { @@ -135,6 +134,10 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { assert(readFileContent(new Path(logDir + "/John_Valjohn"), codec) === "One\nTwo three four...") } + /** + * Read the content of the file specified by the given path. + * If a compression codec is specified, use it to read the file. + */ private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = { val fstream = fileSystem.open(logPath) val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream) From 2d5daf897dc95849a1f59e8a14c2357dfddc609c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 30 Apr 2014 17:48:31 -0700 Subject: [PATCH 10/11] Use temp directory provided by the OS rather than /tmp --- .../scheduler/EventLoggingListener.scala | 1 + .../org/apache/spark/util/FileLogger.scala | 7 +++- .../scala/org/apache/spark/util/Utils.scala | 9 +++++ .../scheduler/EventLoggingListenerSuite.scala | 39 +++++++------------ .../spark/scheduler/ReplayListenerSuite.scala | 17 ++++---- .../apache/spark/util/FileLoggerSuite.scala | 33 +++++++++------- 6 files changed, 57 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a87da52ed3ae7..7968a0691db10 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -68,6 +68,7 @@ private[spark] class EventLoggingListener( * If compression is used, log a file that indicates which compression library is used. */ def start() { + logger.start() logInfo("Logging events to %s".format(logDir)) if (shouldCompress) { val codec = diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 1e3701ff1a5ac..0e6d21b22023a 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -63,7 +63,12 @@ private[spark] class FileLogger( private var writer: Option[PrintWriter] = None - createLogDir() + /** + * Start this logger by creating the logging directory. + */ + def start() { + createLogDir() + } /** * Create a logging directory with the given path. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 79f314c8dd36c..965ec5f905eca 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1062,6 +1062,15 @@ private[spark] object Utils extends Logging { getHadoopFileSystem(new URI(path)) } + /** + * Return the absolute path of a file in the given directory. + */ + def getFilePath(dir: File, fileName: String): Path = { + assert(dir.isDirectory) + val path = new File(dir, fileName).getAbsolutePath + new Path(path) + } + /** * Indicates whether Spark is currently running unit tests. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index d3d8eef514f20..95f5bcd855665 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import scala.io.Source import scala.util.Try +import com.google.common.io.Files import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} @@ -42,10 +43,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { "org.apache.spark.io.LZFCompressionCodec", "org.apache.spark.io.SnappyCompressionCodec" ) + private val testDir = Files.createTempDir() + private val logDirPath = Utils.getFilePath(testDir, "spark-events") after { - Try { fileSystem.delete(new Path("/tmp/spark-events"), true) } - Try { fileSystem.delete(new Path("/tmp/spark-foo"), true) } + Try { fileSystem.delete(logDirPath, true) } } test("Parse names of special files") { @@ -54,7 +56,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("Verify special files exist") { testSpecialFilesExist() - testSpecialFilesExist(logDirPath = Some("/tmp/spark-foo")) } test("Verify special files exist with compression") { @@ -65,7 +66,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("Parse event logging info") { testParsingLogInfo() - testParsingLogInfo(logDirPath = Some("/tmp/spark-foo")) } test("Parse event logging info with compression") { @@ -76,7 +76,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("Basic event logging") { testEventLogging() - testEventLogging(logDirPath = Some("/tmp/spark-foo")) } test("Basic event logging with compression") { @@ -87,7 +86,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { test("End-to-end event logging") { testApplicationEventLogging() - testApplicationEventLogging(logDirPath = Some("/tmp/spark-foo")) } test("End-to-end event logging with compression") { @@ -143,9 +141,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * also exist. Only after the application has completed does the test expect the application * completed file to be present. */ - private def testSpecialFilesExist( - logDirPath: Option[String] = None, - compressionCodec: Option[String] = None) { + private def testSpecialFilesExist(compressionCodec: Option[String] = None) { def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) { val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0 @@ -164,10 +160,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { // Verify logging directory exists val conf = getLoggingConf(logDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) + eventLogger.start() val logPath = new Path(eventLogger.logDir) + assert(fileSystem.exists(logPath)) val logDir = fileSystem.getFileStatus(logPath) assert(logDir.isDir) - eventLogger.start() // Verify special files are as expected before stop() var logFiles = fileSystem.listStatus(logPath) @@ -186,9 +183,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * This includes whether it returns the correct Spark version, compression codec (if any), * and the application's completion status. */ - private def testParsingLogInfo( - logDirPath: Option[String] = None, - compressionCodec: Option[String] = None) { + private def testParsingLogInfo(compressionCodec: Option[String] = None) { def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) { assert(info.logPaths.size > 0) @@ -221,9 +216,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * This creates two simple events, posts them to the EventLoggingListener, and verifies that * exactly these two events are logged in the expected file. */ - private def testEventLogging( - logDirPath: Option[String] = None, - compressionCodec: Option[String] = None) { + private def testEventLogging(compressionCodec: Option[String] = None) { val conf = getLoggingConf(logDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) val listenerBus = new LiveListenerBus @@ -253,14 +246,12 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * Test end-to-end event logging functionality in an application. * This runs a simple Spark job and asserts that the expected events are logged when expected. */ - private def testApplicationEventLogging( - logDirPath: Option[String] = None, - compressionCodec: Option[String] = None) { + private def testApplicationEventLogging(compressionCodec: Option[String] = None) { val conf = getLoggingConf(logDirPath, compressionCodec) val sc = new SparkContext("local", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get - val expectedLogDir = logDirPath.getOrElse(EventLoggingListener.DEFAULT_LOG_DIR) + val expectedLogDir = logDirPath.toString assert(eventLogger.logDir.startsWith(expectedLogDir)) // Begin listening for events that trigger asserts @@ -395,15 +386,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { object EventLoggingListenerSuite { /** Get a SparkConf with event logging enabled. */ - def getLoggingConf( - logDir: Option[String] = None, - compressionCodec: Option[String] = None) = { + def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None) = { val conf = new SparkConf conf.set("spark.eventLog.enabled", "true") conf.set("spark.eventLog.testing", "true") - logDir.foreach { dir => - conf.set("spark.eventLog.dir", dir) - } + conf.set("spark.eventLog.dir", logDir.toString) compressionCodec.foreach { codec => conf.set("spark.eventLog.compress", "true") conf.set("spark.io.compression.codec", codec) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 4f04893ef8239..d1fe1fc348961 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -21,7 +21,7 @@ import java.io.PrintWriter import scala.util.Try -import org.apache.hadoop.fs.Path +import com.google.common.io.Files import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} @@ -39,10 +39,11 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { "org.apache.spark.io.LZFCompressionCodec", "org.apache.spark.io.SnappyCompressionCodec" ) + private val testDir = Files.createTempDir() after { - Try { fileSystem.delete(new Path("/tmp/events.txt"), true) } - Try { fileSystem.delete(new Path("/tmp/test-replay"), true) } + Try { fileSystem.delete(Utils.getFilePath(testDir, "events.txt"), true) } + Try { fileSystem.delete(Utils.getFilePath(testDir, "test-replay"), true) } } test("Simple replay") { @@ -76,7 +77,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { * Test simple replaying of events. */ private def testSimpleReplay(codecName: Option[String] = None) { - val logFilePath = new Path("/tmp/events.txt") + val logFilePath = Utils.getFilePath(testDir, "events.txt") val codec = codecName.map(getCompressionCodec) val fstream = fileSystem.create(logFilePath) val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) @@ -87,7 +88,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) writer.close() val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) - val conf = EventLoggingListenerSuite.getLoggingConf(compressionCodec = codecName) + val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName) val eventMonster = new EventMonster(conf) replayer.addListener(eventMonster) replayer.replay() @@ -104,8 +105,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { * assumption that the event logging behavior is correct (tested in a separate suite). */ private def testApplicationReplay(codecName: Option[String] = None) { - val logDir = "/tmp/test-replay" - val conf = EventLoggingListenerSuite.getLoggingConf(Some(logDir), codecName) + val logDirPath = Utils.getFilePath(testDir, "test-replay") + val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName) val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf) // Run a few jobs @@ -117,7 +118,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { // Prepare information needed for replay val codec = codecName.map(getCompressionCodec) - val applications = fileSystem.listStatus(new Path(logDir)) + val applications = fileSystem.listStatus(logDirPath) assert(applications != null && applications.size > 0) val eventLogDir = applications.sortBy(_.getAccessTime).last assert(eventLogDir.isDir) diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala index 9b16c71284ad1..b4577d23fbb8c 100644 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -22,6 +22,7 @@ import java.io.IOException import scala.io.Source import scala.util.Try +import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.scalatest.{BeforeAndAfter, FunSuite} @@ -37,8 +38,9 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { "org.apache.spark.io.LZFCompressionCodec", "org.apache.spark.io.SnappyCompressionCodec" ) - private val logDir = "/tmp/test-file-logger" - private val logDirPath = new Path(logDir) + private val testDir = Files.createTempDir() + private val logDirPath = Utils.getFilePath(testDir, "test-file-logger") + private val logDirPathString = logDirPath.toString after { Try { fileSystem.delete(logDirPath, true) } @@ -66,12 +68,14 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { test("Logging when directory already exists") { // Create the logging directory multiple times - new FileLogger(logDir, new SparkConf, overwrite = true) - new FileLogger(logDir, new SparkConf, overwrite = true) - new FileLogger(logDir, new SparkConf, overwrite = true) + new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() + new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() + new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() // If overwrite is not enabled, an exception should be thrown - intercept[IOException] { new FileLogger(logDir, new SparkConf, overwrite = false) } + intercept[IOException] { + new FileLogger(logDirPathString, new SparkConf, overwrite = false).start() + } } @@ -87,10 +91,11 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { val codec = codecName.map { c => CompressionCodec.createCodec(conf) } val logger = if (codecName.isDefined) { - new FileLogger(logDir, conf, compress = true) + new FileLogger(logDirPathString, conf, compress = true) } else { - new FileLogger(logDir, conf) + new FileLogger(logDirPathString, conf) } + logger.start() assert(fileSystem.exists(logDirPath)) assert(fileSystem.getFileStatus(logDirPath).isDir) assert(fileSystem.listStatus(logDirPath).size === 0) @@ -118,20 +123,20 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { val codec = codecName.map { c => CompressionCodec.createCodec(conf) } val logger = if (codecName.isDefined) { - new FileLogger(logDir, conf, compress = true) + new FileLogger(logDirPathString, conf, compress = true) } else { - new FileLogger(logDir, conf) + new FileLogger(logDirPathString, conf) } - + logger.start() logger.newFile("Jean_Valjean") logger.logLine("Who am I?") logger.logLine("Destiny?") logger.newFile("John_Valjohn") logger.logLine("One") - logger.logLine("Two three four...") + logger.logLine("Two three...") logger.close() - assert(readFileContent(new Path(logDir + "/Jean_Valjean"), codec) === "Who am I?\nDestiny?") - assert(readFileContent(new Path(logDir + "/John_Valjohn"), codec) === "One\nTwo three four...") + assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?") + assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...") } /** From c3afcea43b8153bc53c74a7853102edcabb6ef5f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 30 Apr 2014 17:54:03 -0700 Subject: [PATCH 11/11] Compromise --- .../test/scala/org/apache/spark/util/FileLoggerSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala index b4577d23fbb8c..f675e1e5b4981 100644 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -134,9 +134,14 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { logger.newFile("John_Valjohn") logger.logLine("One") logger.logLine("Two three...") + logger.newFile("Wolverine") + logger.logLine("There was a time") + logger.logLine("A time when our enemies knew honor.") logger.close() assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?") assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...") + assert(readFileContent(new Path(logDirPath, "Wolverine"), codec) === + "There was a time\nA time when our enemies knew honor.") } /**