Skip to content

Commit

Permalink
Address Patrick's comments
Browse files Browse the repository at this point in the history
This mainly involves (1) making event logging configurable and (2) setting the log boundary
to be on the granularity of application rather than job. Other more minor changes include
variable name changes, and directly assigning TaskMetrics.updatedBlocks rather than appending
to it.
  • Loading branch information
andrewor14 committed Mar 1, 2014
1 parent 2981d61 commit 2fee310
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 28 deletions.
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark

import scala.{Option, deprecated}

import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// Update task metrics to include any blocks whose storage status is updated
val metrics = context.taskMetrics
if (metrics != null) {
metrics.updatedBlocks = Some(updatedBlocks ++ metrics.updatedBlocks.getOrElse(Seq()))
metrics.updatedBlocks = Some(updatedBlocks)
}

elements.iterator.asInstanceOf[Iterator[T]]
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private[spark] class BlockManager(
private def reportAllBlocks() {
logInfo("Reporting " + blockInfo.size + " blocks to the master.")
for ((blockId, info) <- blockInfo) {
val status = getUpdatedBlockStatus(blockId, info)
val status = getCurrentBlockStatus(blockId, info)
if (!tryToReportBlockStatus(blockId, info, status)) {
logError("Failed to report " + blockId + " to master; giving up.")
return
Expand Down Expand Up @@ -251,7 +251,7 @@ private[spark] class BlockManager(
* the block is dropped from memory and possibly added to disk, return the new storage level
* and the updated in-memory and on-disk sizes.
*/
private def getUpdatedBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
val (newLevel, inMemSize, onDiskSize) = info.synchronized {
info.level match {
case null =>
Expand Down Expand Up @@ -626,7 +626,7 @@ private[spark] class BlockManager(
// and tell the master about it.
marked = true
putBlockInfo.markReady(size)
val putBlockStatus = getUpdatedBlockStatus(blockId, putBlockInfo)
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
Expand Down Expand Up @@ -767,7 +767,7 @@ private[spark] class BlockManager(
logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
}

val status = getUpdatedBlockStatus(blockId, info)
val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
reportBlockStatus(blockId, info, status, droppedMemorySize)
}
Expand Down Expand Up @@ -812,7 +812,7 @@ private[spark] class BlockManager(
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
val status = getUpdatedBlockStatus(blockId, info)
val status = getCurrentBlockStatus(blockId, info)
reportBlockStatus(blockId, info, status)
}
} else {
Expand Down Expand Up @@ -848,7 +848,7 @@ private[spark] class BlockManager(
iterator.remove()
logInfo("Dropped block " + id)
}
val status = getUpdatedBlockStatus(id, info)
val status = getCurrentBlockStatus(id, info)
reportBlockStatus(id, info, status)
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[spark] class SparkUI(val sc: SparkContext) extends Logging {
private var _gatewayListener: Option[GatewayUISparkListener] = None

def gatewayListener = _gatewayListener.getOrElse {
val gateway = new GatewayUISparkListener(this, live)
val gateway = new GatewayUISparkListener(this, sc)
_gatewayListener = Some(gateway)
gateway
}
Expand Down Expand Up @@ -125,6 +125,7 @@ private[spark] class SparkUI(val sc: SparkContext) extends Logging {

def stop() {
server.foreach(_.stop())
_gatewayListener.foreach(_.stop())
logInfo("Stopped Spark Web UI at %s".format(appUIAddress))
}

Expand Down
30 changes: 15 additions & 15 deletions core/src/main/scala/org/apache/spark/ui/UISparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark.util.FileLogger
import org.apache.spark.util.JsonProtocol
import org.apache.spark.SparkContext

private[ui] trait UISparkListener extends SparkListener

Expand All @@ -36,16 +37,20 @@ private[ui] trait UISparkListener extends SparkListener
*
* (1) If the UI is live, GatewayUISparkListener posts each event to all attached listeners
* then logs it as JSON. This centralizes event logging and avoids having all attached
* listeners log the events on their own. By default, GatewayUISparkListener logs one
* file per job, though this needs not be the case.
* listeners log the events on their own.
*
* (2) If the UI is rendered from disk, GatewayUISparkListener replays each event deserialized
* from the event logs to all attached listeners.
*/
private[ui] class GatewayUISparkListener(parent: SparkUI, live: Boolean) extends SparkListener {
private[ui] class GatewayUISparkListener(parent: SparkUI, sc: SparkContext) extends SparkListener {

// Log events only if the UI is live
private val logger: Option[FileLogger] = if (live) Some(new FileLogger()) else None
private val logger: Option[FileLogger] = {
if (sc != null && sc.conf.getBoolean("spark.eventLog.enabled", false)) {
val logDir = sc.conf.get("spark.eventLog.dir", "/tmp/spark-events")
Some(new FileLogger(logDir))
} else None
}

// Children listeners for which this gateway is responsible
private val listeners = ArrayBuffer[UISparkListener]()
Expand All @@ -61,17 +66,14 @@ private[ui] class GatewayUISparkListener(parent: SparkUI, live: Boolean) extends
}
}

private def startLogger() = logger.foreach(_.start())
private def closeLogger() = logger.foreach(_.close())

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
listeners.foreach(_.onStageSubmitted(stageSubmitted))
logEvent(stageSubmitted, flushLogger = true)
logEvent(stageSubmitted)
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
listeners.foreach(_.onStageCompleted(stageCompleted))
logEvent(stageCompleted, flushLogger = true)
logEvent(stageCompleted)
}

override def onTaskStart(taskStart: SparkListenerTaskStart) {
Expand All @@ -89,14 +91,12 @@ private[ui] class GatewayUISparkListener(parent: SparkUI, live: Boolean) extends

override def onJobStart(jobStart: SparkListenerJobStart) {
listeners.foreach(_.onJobStart(jobStart))
startLogger()
logEvent(jobStart)
logEvent(jobStart, flushLogger = true)
}

override def onJobEnd(jobEnd: SparkListenerJobEnd) {
listeners.foreach(_.onJobEnd(jobEnd))
logEvent(jobEnd)
closeLogger()
logEvent(jobEnd, flushLogger = true)
}

override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
Expand All @@ -118,10 +118,10 @@ private[ui] class GatewayUISparkListener(parent: SparkUI, live: Boolean) extends

override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) {
listeners.foreach(_.onUnpersistRDD(unpersistRDD))
// In case logger has not already started, as unpersist may be called between jobs
startLogger()
logEvent(unpersistRDD, flushLogger = true)
}

def stop() = logger.foreach(_.close())
}

/**
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import org.apache.spark.Logging
* @param name An identifier of each FileLogger instance
*/
class FileLogger(
logDir: String = Option(System.getenv("SPARK_LOG_DIR"))
.getOrElse("/tmp/spark-%s".format(System.getProperty("user.name", "user"))),
logDir: String,
name: String = String.valueOf(System.currentTimeMillis()))
extends Logging {

Expand Down
16 changes: 15 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,21 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.logConf</td>
<td>false</td>
<td>
Log the supplied SparkConf as INFO at start of spark context.
Whether to log the supplied SparkConf as INFO at start of spark context.
</td>
</tr>
<tr>
<td>spark.eventLog.enabled</td>
<td>false</td>
<td>
Whether to log spark events, useful for reconstructing the Web UI after the application has finished.
</td>
</tr>
<tr>
<td>spark.eventLog.dir</td>
<td>/tmp/spark-events</td>
<td>
Directory in which spark events are logged, if <code>spark.eventLog.enabled</code> is true.
</td>
</tr>
<tr>
Expand Down

0 comments on commit 2fee310

Please sign in to comment.