Skip to content

Commit

Permalink
Formatting and renaming variables
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 28, 2014
1 parent 769336f commit 187bb25
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{FileLogger, JsonProtocol}

Expand All @@ -39,22 +40,25 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
*/
private[spark] class EventLoggingListener(
appName: String,
conf: SparkConf,
hadoopConfiguration: Configuration)
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = conf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
def this(appName: String, sparkConf: SparkConf) = {
this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration())
}

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
val logDir = logBaseDir + "/" + name

private val logger =
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
shouldOverwrite)
new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite)

/**
* Begin logging events.
Expand All @@ -63,7 +67,7 @@ private[spark] class EventLoggingListener(
def start() {
logInfo("Logging events to %s".format(logDir))
if (shouldCompress) {
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
val codec = sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
}
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.util

import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
Expand All @@ -38,8 +38,8 @@ import org.apache.spark.io.CompressionCodec
*/
private[spark] class FileLogger(
logDir: String,
conf: SparkConf,
hadoopConfiguration: Configuration,
sparkConf: SparkConf,
hadoopConf: Configuration,
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true)
Expand All @@ -53,7 +53,7 @@ private[spark] class FileLogger(
var fileIndex = 0

// Only used if compression is enabled
private lazy val compressionCodec = CompressionCodec.createCodec(conf)
private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf)

// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None
Expand Down Expand Up @@ -87,8 +87,8 @@ private[spark] class FileLogger(
private def createWriter(fileName: String): PrintWriter = {
val logPath = logDir + "/" + fileName
val uri = new URI(logPath)
val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
val isDefaultLocal = (defaultFs == null || defaultFs == "file")
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
* Therefore, for local files, use FileOutputStream instead. */
Expand Down

0 comments on commit 187bb25

Please sign in to comment.