Skip to content

Commit

Permalink
Tests for FileLogger + delete file after tests
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 29, 2014
1 parent 187bb25 commit ab66a84
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,11 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
private[spark] class EventLoggingListener(
appName: String,
sparkConf: SparkConf,
hadoopConf: Configuration)
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
extends SparkListener with Logging {

import EventLoggingListener._

def this(appName: String, sparkConf: SparkConf) = {
this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration())
}

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
Expand All @@ -67,7 +63,8 @@ private[spark] class EventLoggingListener(
def start() {
logInfo("Logging events to %s".format(logDir))
if (shouldCompress) {
val codec = sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
val codec =
sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
}
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec

/**
Expand All @@ -39,7 +40,7 @@ import org.apache.spark.io.CompressionCodec
private[spark] class FileLogger(
logDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration,
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true)
Expand Down Expand Up @@ -116,7 +117,7 @@ private[spark] class FileLogger(
val writeInfo = if (!withTime) {
msg
} else {
val date = new Date(System.currentTimeMillis())
val date = new Date(System.currentTimeMillis)
dateFormat.get.format(date) + ": " + msg
}
writer.foreach(_.print(writeInfo))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.scheduler

import scala.collection.mutable
import scala.io.Source
import scala.util.Try

import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.jackson.JsonMethods._
Expand All @@ -41,6 +42,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
"org.apache.spark.io.SnappyCompressionCodec"
)

after {
Try { fileSystem.delete(new Path("/tmp/spark-events"), true) }
Try { fileSystem.delete(new Path("/tmp/spark-foo"), true) }
}

test("Parse names of special files") {
testParsingFileName()
}
Expand Down Expand Up @@ -376,4 +382,4 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
conf
}

}
}
153 changes: 153 additions & 0 deletions core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import java.io.IOException

import scala.io.Source
import scala.util.Try

import org.apache.hadoop.fs.Path
import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.SparkConf
import org.apache.spark.io.CompressionCodec

/**
* Tests for writing files through the FileLogger.
*/
class FileLoggerSuite extends FunSuite with BeforeAndAfter {
private val fileSystem = Utils.getHadoopFileSystem("/")
private val allCompressionCodecs = Seq[String](
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
)
private val logDir = "/tmp/test-file-logger"
private val logDirPath = new Path(logDir)

after {
Try { fileSystem.delete(logDirPath, true) }
Try { fileSystem.delete(new Path("falafel"), true) }
}

test("simple write") {
testSingleFile()
}

test ("simple write with compression") {
allCompressionCodecs.foreach { codec =>
testSingleFile(Some(codec))
}
}

test("multiple files") {
testMultipleFiles()
}

test("multiple files with compression") {
allCompressionCodecs.foreach { codec =>
testMultipleFiles(Some(codec))
}
}

test("logging when directory already exists") {
// Create the logging directory multiple times
new FileLogger(logDir, new SparkConf, overwrite = true)
new FileLogger(logDir, new SparkConf, overwrite = true)
new FileLogger(logDir, new SparkConf, overwrite = true)

// If overwrite is not enabled, an exception should be thrown
intercept[IOException] { new FileLogger(logDir, new SparkConf, overwrite = false) }
}


/* ----------------- *
* Actual test logic *
* ----------------- */

/**
* Test logging to a single file.
*/
private def testSingleFile(codecName: Option[String] = None) {
val conf = getLoggingConf(codecName)
val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
val logger =
if (codecName.isDefined) {
new FileLogger(logDir, conf, compress = true)
} else {
new FileLogger(logDir, conf)
}
assert(fileSystem.exists(logDirPath))
assert(fileSystem.getFileStatus(logDirPath).isDir)
assert(fileSystem.listStatus(logDirPath).size === 0)

logger.newFile()
val files = fileSystem.listStatus(logDirPath)
assert(files.size === 1)
val firstFile = files.head
val firstFilePath = firstFile.getPath

logger.log("hello")
logger.flush()
assert(readFileContent(firstFilePath, codec) === "hello")

logger.log(" world")
logger.close()
assert(readFileContent(firstFilePath, codec) === "hello world")
}

/**
* Test logging to multiple files.
*/
private def testMultipleFiles(codecName: Option[String] = None) {
val conf = getLoggingConf(codecName)
val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
val logger =
if (codecName.isDefined) {
new FileLogger(logDir, conf, compress = true)
} else {
new FileLogger(logDir, conf)
}
assert(fileSystem.exists(logDirPath))
assert(fileSystem.getFileStatus(logDirPath).isDir)
assert(fileSystem.listStatus(logDirPath).size === 0)

logger.newFile("Jean_Valjean")
logger.logLine("Who am I?")
logger.logLine("Destiny?")
logger.newFile("John_Valjohn")
logger.logLine("One")
logger.logLine("Two three four...")
logger.close()
assert(readFileContent(new Path(logDir + "/Jean_Valjean"), codec) === "Who am I?\nDestiny?")
assert(readFileContent(new Path(logDir + "/John_Valjohn"), codec) === "One\nTwo three four...")
}

private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = {
val fstream = fileSystem.open(logPath)
val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream)
Source.fromInputStream(cstream).getLines().mkString("\n")
}

private def getLoggingConf(codecName: Option[String]) = {
val conf = new SparkConf
codecName.foreach { c => conf.set("spark.io.compression.codec", c) }
conf
}

}

0 comments on commit ab66a84

Please sign in to comment.