Skip to content

Commit

Permalink
Add tests for FileLogger, EventLoggingListener, and ReplayListenerBus
Browse files Browse the repository at this point in the history
Modifications to Spark core are limited to exposing functionality to test files + minor style fixes.
(728 / 769 lines are from tests)

Author: Andrew Or <[email protected]>

Closes apache#591 from andrewor14/event-log-tests and squashes the following commits:

2883837 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests
c3afcea [Andrew Or] Compromise
2d5daf8 [Andrew Or] Use temp directory provided by the OS rather than /tmp
2b52151 [Andrew Or] Remove unnecessary file delete + add a comment
62010fd [Andrew Or] More cleanup (renaming variables, updating comments etc)
ad2beff [Andrew Or] Clean up EventLoggingListenerSuite + modify a few comments
862e752 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests
e0ba2f8 [Andrew Or] Fix test failures caused by race condition in processing/mutating events
b990453 [Andrew Or] ReplayListenerBus suite - tests do not all pass yet
ab66a84 [Andrew Or] Tests for FileLogger + delete file after tests
187bb25 [Andrew Or] Formatting and renaming variables
769336f [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests
5d38ffe [Andrew Or] Clean up EventLoggingListenerSuite + add comments
e12f4b1 [Andrew Or] Preliminary tests for EventLoggingListener (need major cleanup)
  • Loading branch information
andrewor14 authored and pwendell committed May 2, 2014
1 parent 40cf6d3 commit 394d8cb
Show file tree
Hide file tree
Showing 8 changed files with 791 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package org.apache.spark.scheduler

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{FileLogger, JsonProtocol}

Expand All @@ -40,31 +43,36 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
*/
private[spark] class EventLoggingListener(
appName: String,
conf: SparkConf,
hadoopConfiguration: Configuration)
sparkConf: SparkConf,
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
extends SparkListener with Logging {

import EventLoggingListener._

private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/")
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
val logDir = logBaseDir + "/" + name

private val logger =
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))

// For testing. Keep track of all JSON serialized events that have been logged.
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

/**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
*/
def start() {
logger.start()
logInfo("Logging events to %s".format(logDir))
if (shouldCompress) {
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
val codec =
sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
}
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
Expand All @@ -73,11 +81,14 @@ private[spark] class EventLoggingListener(

/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
logger.logLine(eventJson)
val eventJson = JsonProtocol.sparkEventToJson(event)
logger.logLine(compact(render(eventJson)))
if (flushLogger) {
logger.flush()
}
if (testing) {
loggedEvents += eventJson
}
}

// Events that do not trigger a flush
Expand Down Expand Up @@ -121,13 +132,12 @@ private[spark] class EventLoggingListener(
}

private[spark] object EventLoggingListener extends Logging {
val DEFAULT_LOG_DIR = "/tmp/spark-events"
val LOG_PREFIX = "EVENT_LOG_"
val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
val LOG_FILE_PERMISSIONS: FsPermission =
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)

val LOG_FILE_PERMISSIONS = FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)

// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[spark] trait SparkListenerBus {
* Post an event to all attached listeners. This does nothing if the event is
* SparkListenerShutdown.
*/
protected def postToAll(event: SparkListenerEvent) {
def postToAll(event: SparkListenerEvent) {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
Expand Down
28 changes: 17 additions & 11 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.util

import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
Expand All @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec

/**
Expand All @@ -39,8 +40,8 @@ import org.apache.spark.io.CompressionCodec
*/
private[spark] class FileLogger(
logDir: String,
conf: SparkConf,
hadoopConfiguration: Configuration,
sparkConf: SparkConf,
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true,
Expand All @@ -55,14 +56,19 @@ private[spark] class FileLogger(
var fileIndex = 0

// Only used if compression is enabled
private lazy val compressionCodec = CompressionCodec.createCodec(conf)
private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf)

// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None

private var writer: Option[PrintWriter] = None

createLogDir()
/**
* Start this logger by creating the logging directory.
*/
def start() {
createLogDir()
}

/**
* Create a logging directory with the given path.
Expand All @@ -83,7 +89,7 @@ private[spark] class FileLogger(
}
if (dirPermissions.isDefined) {
val fsStatus = fileSystem.getFileStatus(path)
if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) {
if (fsStatus.getPermission.toShort != dirPermissions.get.toShort) {
fileSystem.setPermission(path, dirPermissions.get)
}
}
Expand All @@ -92,14 +98,14 @@ private[spark] class FileLogger(
/**
* Create a new writer for the file identified by the given path.
* If the permissions are not passed in, it will default to use the permissions
* (dirpermissions) used when class was instantiated.
* (dirPermissions) used when class was instantiated.
*/
private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
val logPath = logDir + "/" + fileName
val uri = new URI(logPath)
val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
val isDefaultLocal = (defaultFs == null || defaultFs == "file")
val path = new Path(logPath)
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
* Therefore, for local files, use FileOutputStream instead. */
Expand All @@ -112,7 +118,7 @@ private[spark] class FileLogger(
hadoopDataStream.get
}

perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)}
perms.orElse(dirPermissions).foreach { p => fileSystem.setPermission(path, p) }
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream)
Expand All @@ -127,7 +133,7 @@ private[spark] class FileLogger(
val writeInfo = if (!withTime) {
msg
} else {
val date = new Date(System.currentTimeMillis())
val date = new Date(System.currentTimeMillis)
dateFormat.get.format(date) + ": " + msg
}
writer.foreach(_.print(writeInfo))
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -646,11 +646,11 @@ private[spark] object JsonProtocol {
}

def propertiesFromJson(json: JValue): Properties = {
val properties = new Properties()
if (json != JNothing) {
mapFromJson(json).map { case (k, v) => properties.setProperty(k, v) }
}
properties
Utils.jsonOption(json).map { value =>
val properties = new Properties
mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
properties
}.getOrElse(null)
}

def UUIDFromJson(json: JValue): UUID = {
Expand Down
18 changes: 14 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1062,15 +1062,25 @@ private[spark] object Utils extends Logging {
}

/**
* return true if this is Windows.
* Return the absolute path of a file in the given directory.
*/
def isWindows = Option(System.getProperty("os.name")).
map(_.startsWith("Windows")).getOrElse(false)
def getFilePath(dir: File, fileName: String): Path = {
assert(dir.isDirectory)
val path = new File(dir, fileName).getAbsolutePath
new Path(path)
}

/**
* Return true if this is Windows.
*/
def isWindows = {
Option(System.getProperty("os.name")).exists(_.startsWith("Windows"))
}

/**
* Indicates whether Spark is currently running unit tests.
*/
private[spark] def isTesting = {
def isTesting = {
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
}
}
Loading

0 comments on commit 394d8cb

Please sign in to comment.