Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for FileLogger, EventLoggingListener, and ReplayListenerBus #591

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package org.apache.spark.scheduler

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{FileLogger, JsonProtocol}

Expand All @@ -40,31 +43,36 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
*/
private[spark] class EventLoggingListener(
appName: String,
conf: SparkConf,
hadoopConfiguration: Configuration)
sparkConf: SparkConf,
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
extends SparkListener with Logging {

import EventLoggingListener._

private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/")
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
val logDir = logBaseDir + "/" + name

private val logger =
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))

// For testing. Keep track of all JSON serialized events that have been logged.
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

/**
* Begin logging events.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this was over the line limit before (?) wouldn't it have failed our style checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it to sparkConf, because there's also a hadoopConf now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - I misread this to only be a linebreak change

* If compression is used, log a file that indicates which compression library is used.
*/
def start() {
logger.start()
logInfo("Logging events to %s".format(logDir))
if (shouldCompress) {
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
val codec =
sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
}
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
Expand All @@ -73,11 +81,14 @@ private[spark] class EventLoggingListener(

/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
logger.logLine(eventJson)
val eventJson = JsonProtocol.sparkEventToJson(event)
logger.logLine(compact(render(eventJson)))
if (flushLogger) {
logger.flush()
}
if (testing) {
loggedEvents += eventJson
}
}

// Events that do not trigger a flush
Expand Down Expand Up @@ -121,13 +132,12 @@ private[spark] class EventLoggingListener(
}

private[spark] object EventLoggingListener extends Logging {
val DEFAULT_LOG_DIR = "/tmp/spark-events"
val LOG_PREFIX = "EVENT_LOG_"
val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
val LOG_FILE_PERMISSIONS: FsPermission =
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)

val LOG_FILE_PERMISSIONS = FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)

// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[spark] trait SparkListenerBus {
* Post an event to all attached listeners. This does nothing if the event is
* SparkListenerShutdown.
*/
protected def postToAll(event: SparkListenerEvent) {
def postToAll(event: SparkListenerEvent) {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
Expand Down
28 changes: 17 additions & 11 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.util

import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
Expand All @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec

/**
Expand All @@ -39,8 +40,8 @@ import org.apache.spark.io.CompressionCodec
*/
private[spark] class FileLogger(
logDir: String,
conf: SparkConf,
hadoopConfiguration: Configuration,
sparkConf: SparkConf,
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true,
Expand All @@ -55,14 +56,19 @@ private[spark] class FileLogger(
var fileIndex = 0

// Only used if compression is enabled
private lazy val compressionCodec = CompressionCodec.createCodec(conf)
private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf)

// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None

private var writer: Option[PrintWriter] = None

createLogDir()
/**
* Start this logger by creating the logging directory.
*/
def start() {
createLogDir()
}

/**
* Create a logging directory with the given path.
Expand All @@ -83,7 +89,7 @@ private[spark] class FileLogger(
}
if (dirPermissions.isDefined) {
val fsStatus = fileSystem.getFileStatus(path)
if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) {
if (fsStatus.getPermission.toShort != dirPermissions.get.toShort) {
fileSystem.setPermission(path, dirPermissions.get)
}
}
Expand All @@ -92,14 +98,14 @@ private[spark] class FileLogger(
/**
* Create a new writer for the file identified by the given path.
* If the permissions are not passed in, it will default to use the permissions
* (dirpermissions) used when class was instantiated.
* (dirPermissions) used when class was instantiated.
*/
private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
val logPath = logDir + "/" + fileName
val uri = new URI(logPath)
val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
val isDefaultLocal = (defaultFs == null || defaultFs == "file")
val path = new Path(logPath)
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
* Therefore, for local files, use FileOutputStream instead. */
Expand All @@ -112,7 +118,7 @@ private[spark] class FileLogger(
hadoopDataStream.get
}

perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)}
perms.orElse(dirPermissions).foreach { p => fileSystem.setPermission(path, p) }
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream)
Expand All @@ -127,7 +133,7 @@ private[spark] class FileLogger(
val writeInfo = if (!withTime) {
msg
} else {
val date = new Date(System.currentTimeMillis())
val date = new Date(System.currentTimeMillis)
dateFormat.get.format(date) + ": " + msg
}
writer.foreach(_.print(writeInfo))
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -646,11 +646,11 @@ private[spark] object JsonProtocol {
}

def propertiesFromJson(json: JValue): Properties = {
val properties = new Properties()
if (json != JNothing) {
mapFromJson(json).map { case (k, v) => properties.setProperty(k, v) }
}
properties
Utils.jsonOption(json).map { value =>
val properties = new Properties
mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
properties
}.getOrElse(null)
}

def UUIDFromJson(json: JValue): UUID = {
Expand Down
18 changes: 14 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1062,15 +1062,25 @@ private[spark] object Utils extends Logging {
}

/**
* return true if this is Windows.
* Return the absolute path of a file in the given directory.
*/
def isWindows = Option(System.getProperty("os.name")).
map(_.startsWith("Windows")).getOrElse(false)
def getFilePath(dir: File, fileName: String): Path = {
assert(dir.isDirectory)
val path = new File(dir, fileName).getAbsolutePath
new Path(path)
}

/**
* Return true if this is Windows.
*/
def isWindows = {
Option(System.getProperty("os.name")).exists(_.startsWith("Windows"))
}

/**
* Indicates whether Spark is currently running unit tests.
*/
private[spark] def isTesting = {
def isTesting = {
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
}
}
Loading