Skip to content

Commit

Permalink
Use temp directory provided by the OS rather than /tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed May 1, 2014
1 parent 2b52151 commit 2d5daf8
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private[spark] class EventLoggingListener(
* If compression is used, log a file that indicates which compression library is used.
*/
def start() {
logger.start()
logInfo("Logging events to %s".format(logDir))
if (shouldCompress) {
val codec =
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ private[spark] class FileLogger(

private var writer: Option[PrintWriter] = None

createLogDir()
/**
* Start this logger by creating the logging directory.
*/
def start() {
createLogDir()
}

/**
* Create a logging directory with the given path.
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,15 @@ private[spark] object Utils extends Logging {
getHadoopFileSystem(new URI(path))
}

/**
* Return the absolute path of a file in the given directory.
*/
def getFilePath(dir: File, fileName: String): Path = {
assert(dir.isDirectory)
val path = new File(dir, fileName).getAbsolutePath
new Path(path)
}

/**
* Indicates whether Spark is currently running unit tests.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable
import scala.io.Source
import scala.util.Try

import com.google.common.io.Files
import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
Expand All @@ -42,10 +43,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
)
private val testDir = Files.createTempDir()
private val logDirPath = Utils.getFilePath(testDir, "spark-events")

after {
Try { fileSystem.delete(new Path("/tmp/spark-events"), true) }
Try { fileSystem.delete(new Path("/tmp/spark-foo"), true) }
Try { fileSystem.delete(logDirPath, true) }
}

test("Parse names of special files") {
Expand All @@ -54,7 +56,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

test("Verify special files exist") {
testSpecialFilesExist()
testSpecialFilesExist(logDirPath = Some("/tmp/spark-foo"))
}

test("Verify special files exist with compression") {
Expand All @@ -65,7 +66,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

test("Parse event logging info") {
testParsingLogInfo()
testParsingLogInfo(logDirPath = Some("/tmp/spark-foo"))
}

test("Parse event logging info with compression") {
Expand All @@ -76,7 +76,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

test("Basic event logging") {
testEventLogging()
testEventLogging(logDirPath = Some("/tmp/spark-foo"))
}

test("Basic event logging with compression") {
Expand All @@ -87,7 +86,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

test("End-to-end event logging") {
testApplicationEventLogging()
testApplicationEventLogging(logDirPath = Some("/tmp/spark-foo"))
}

test("End-to-end event logging with compression") {
Expand Down Expand Up @@ -143,9 +141,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
* also exist. Only after the application has completed does the test expect the application
* completed file to be present.
*/
private def testSpecialFilesExist(
logDirPath: Option[String] = None,
compressionCodec: Option[String] = None) {
private def testSpecialFilesExist(compressionCodec: Option[String] = None) {

def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) {
val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0
Expand All @@ -164,10 +160,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
// Verify logging directory exists
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
eventLogger.start()
val logPath = new Path(eventLogger.logDir)
assert(fileSystem.exists(logPath))
val logDir = fileSystem.getFileStatus(logPath)
assert(logDir.isDir)
eventLogger.start()

// Verify special files are as expected before stop()
var logFiles = fileSystem.listStatus(logPath)
Expand All @@ -186,9 +183,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
* This includes whether it returns the correct Spark version, compression codec (if any),
* and the application's completion status.
*/
private def testParsingLogInfo(
logDirPath: Option[String] = None,
compressionCodec: Option[String] = None) {
private def testParsingLogInfo(compressionCodec: Option[String] = None) {

def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) {
assert(info.logPaths.size > 0)
Expand Down Expand Up @@ -221,9 +216,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
* This creates two simple events, posts them to the EventLoggingListener, and verifies that
* exactly these two events are logged in the expected file.
*/
private def testEventLogging(
logDirPath: Option[String] = None,
compressionCodec: Option[String] = None) {
private def testEventLogging(compressionCodec: Option[String] = None) {
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
val listenerBus = new LiveListenerBus
Expand Down Expand Up @@ -253,14 +246,12 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
* Test end-to-end event logging functionality in an application.
* This runs a simple Spark job and asserts that the expected events are logged when expected.
*/
private def testApplicationEventLogging(
logDirPath: Option[String] = None,
compressionCodec: Option[String] = None) {
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
val conf = getLoggingConf(logDirPath, compressionCodec)
val sc = new SparkContext("local", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val expectedLogDir = logDirPath.getOrElse(EventLoggingListener.DEFAULT_LOG_DIR)
val expectedLogDir = logDirPath.toString
assert(eventLogger.logDir.startsWith(expectedLogDir))

// Begin listening for events that trigger asserts
Expand Down Expand Up @@ -395,15 +386,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
object EventLoggingListenerSuite {

/** Get a SparkConf with event logging enabled. */
def getLoggingConf(
logDir: Option[String] = None,
compressionCodec: Option[String] = None) = {
def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None) = {
val conf = new SparkConf
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.testing", "true")
logDir.foreach { dir =>
conf.set("spark.eventLog.dir", dir)
}
conf.set("spark.eventLog.dir", logDir.toString)
compressionCodec.foreach { codec =>
conf.set("spark.eventLog.compress", "true")
conf.set("spark.io.compression.codec", codec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.PrintWriter

import scala.util.Try

import org.apache.hadoop.fs.Path
import com.google.common.io.Files
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}

Expand All @@ -39,10 +39,11 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
)
private val testDir = Files.createTempDir()

after {
Try { fileSystem.delete(new Path("/tmp/events.txt"), true) }
Try { fileSystem.delete(new Path("/tmp/test-replay"), true) }
Try { fileSystem.delete(Utils.getFilePath(testDir, "events.txt"), true) }
Try { fileSystem.delete(Utils.getFilePath(testDir, "test-replay"), true) }
}

test("Simple replay") {
Expand Down Expand Up @@ -76,7 +77,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* Test simple replaying of events.
*/
private def testSimpleReplay(codecName: Option[String] = None) {
val logFilePath = new Path("/tmp/events.txt")
val logFilePath = Utils.getFilePath(testDir, "events.txt")
val codec = codecName.map(getCompressionCodec)
val fstream = fileSystem.create(logFilePath)
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
Expand All @@ -87,7 +88,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
writer.close()
val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
val conf = EventLoggingListenerSuite.getLoggingConf(compressionCodec = codecName)
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName)
val eventMonster = new EventMonster(conf)
replayer.addListener(eventMonster)
replayer.replay()
Expand All @@ -104,8 +105,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* assumption that the event logging behavior is correct (tested in a separate suite).
*/
private def testApplicationReplay(codecName: Option[String] = None) {
val logDir = "/tmp/test-replay"
val conf = EventLoggingListenerSuite.getLoggingConf(Some(logDir), codecName)
val logDirPath = Utils.getFilePath(testDir, "test-replay")
val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName)
val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf)

// Run a few jobs
Expand All @@ -117,7 +118,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {

// Prepare information needed for replay
val codec = codecName.map(getCompressionCodec)
val applications = fileSystem.listStatus(new Path(logDir))
val applications = fileSystem.listStatus(logDirPath)
assert(applications != null && applications.size > 0)
val eventLogDir = applications.sortBy(_.getAccessTime).last
assert(eventLogDir.isDir)
Expand Down
33 changes: 19 additions & 14 deletions core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.IOException
import scala.io.Source
import scala.util.Try

import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.scalatest.{BeforeAndAfter, FunSuite}

Expand All @@ -37,8 +38,9 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
)
private val logDir = "/tmp/test-file-logger"
private val logDirPath = new Path(logDir)
private val testDir = Files.createTempDir()
private val logDirPath = Utils.getFilePath(testDir, "test-file-logger")
private val logDirPathString = logDirPath.toString

after {
Try { fileSystem.delete(logDirPath, true) }
Expand Down Expand Up @@ -66,12 +68,14 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {

test("Logging when directory already exists") {
// Create the logging directory multiple times
new FileLogger(logDir, new SparkConf, overwrite = true)
new FileLogger(logDir, new SparkConf, overwrite = true)
new FileLogger(logDir, new SparkConf, overwrite = true)
new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()

// If overwrite is not enabled, an exception should be thrown
intercept[IOException] { new FileLogger(logDir, new SparkConf, overwrite = false) }
intercept[IOException] {
new FileLogger(logDirPathString, new SparkConf, overwrite = false).start()
}
}


Expand All @@ -87,10 +91,11 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
val logger =
if (codecName.isDefined) {
new FileLogger(logDir, conf, compress = true)
new FileLogger(logDirPathString, conf, compress = true)
} else {
new FileLogger(logDir, conf)
new FileLogger(logDirPathString, conf)
}
logger.start()
assert(fileSystem.exists(logDirPath))
assert(fileSystem.getFileStatus(logDirPath).isDir)
assert(fileSystem.listStatus(logDirPath).size === 0)
Expand Down Expand Up @@ -118,20 +123,20 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
val logger =
if (codecName.isDefined) {
new FileLogger(logDir, conf, compress = true)
new FileLogger(logDirPathString, conf, compress = true)
} else {
new FileLogger(logDir, conf)
new FileLogger(logDirPathString, conf)
}

logger.start()
logger.newFile("Jean_Valjean")
logger.logLine("Who am I?")
logger.logLine("Destiny?")
logger.newFile("John_Valjohn")
logger.logLine("One")
logger.logLine("Two three four...")
logger.logLine("Two three...")
logger.close()
assert(readFileContent(new Path(logDir + "/Jean_Valjean"), codec) === "Who am I?\nDestiny?")
assert(readFileContent(new Path(logDir + "/John_Valjohn"), codec) === "One\nTwo three four...")
assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?")
assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...")
}

/**
Expand Down

0 comments on commit 2d5daf8

Please sign in to comment.