From ca258a44af514ac6096ce63bcb28922f8aa4d884 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 5 Mar 2014 17:03:12 -0800 Subject: [PATCH] Master UI - add support for reading compressed event logs In addition to passing an event log dir to ApplicationDescription, we also pass the compression codec we're using, if we decide to compress logged events. --- .../spark/deploy/ApplicationDescription.scala | 4 ++- .../apache/spark/deploy/master/Master.scala | 25 +++++++++++++------ .../apache/spark/io/CompressionCodec.scala | 5 ++-- .../scheduler/EventLoggingListener.scala | 14 ++++++++++- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../scala/org/apache/spark/ui/SparkUI.scala | 6 ++--- 6 files changed, 41 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 86305d2ea8a09..15fa8a7679874 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy +import org.apache.spark.scheduler.EventLoggingInfo + private[spark] class ApplicationDescription( val name: String, val maxCores: Option[Int], @@ -24,7 +26,7 @@ private[spark] class ApplicationDescription( val command: Command, val sparkHome: Option[String], var appUiUrl: String, - val eventLogDir: Option[String] = None) + val eventLogInfo: Option[EventLoggingInfo] = None) extends Serializable { val user = System.getProperty("user.name", "") diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7cc68925a5d2f..864f3d9a908b9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -619,11 +619,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act waitingApps -= app // If application events are logged, use them to rebuild the UI - val rebuildAppUI = app.desc.eventLogDir.isDefined + val rebuildAppUI = app.desc.eventLogInfo.isDefined if (rebuildAppUI) { - val appName = app.desc.name - val eventLogDir = app.desc.eventLogDir.get - val ui = startPersistedSparkUI(appName, eventLogDir) + val ui = startPersistedSparkUI(app) app.desc.appUiUrl = ui.appUIAddress appIdToUI(app.id) = ui } else { @@ -655,9 +653,22 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } - /** Start a new SparkUI rendered from persisted storage */ - def startPersistedSparkUI(appName: String, eventLogDir: String): SparkUI = { - val ui = new SparkUI(conf, nextPersistedUIPort) + /** + * Start a new SparkUI rendered from persisted storage. Assumes event logging information + * is available for given application. + */ + def startPersistedSparkUI(app: ApplicationInfo): SparkUI = { + val appName = app.desc.name + val eventLogInfo = app.desc.eventLogInfo.get + val eventLogDir = eventLogInfo.logDir + val eventCompressionCodec = eventLogInfo.compressionCodec + val appConf = new SparkConf + eventCompressionCodec.foreach { codec => + appConf.set("spark.eventLog.compress", "true") + appConf.set("spark.io.compression.codec", codec) + } + + val ui = new SparkUI(appConf, nextPersistedUIPort) ui.setAppName(appName) ui.bind() ui.start() diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 848b5c439bb5b..059e58824c39b 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -38,8 +38,7 @@ trait CompressionCodec { private[spark] object CompressionCodec { def createCodec(conf: SparkConf): CompressionCodec = { - createCodec(conf, conf.get( - "spark.io.compression.codec", classOf[LZFCompressionCodec].getName)) + createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)) } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { @@ -47,6 +46,8 @@ private[spark] object CompressionCodec { .getConstructor(classOf[SparkConf]) ctor.newInstance(conf).asInstanceOf[CompressionCodec] } + + val DEFAULT_COMPRESSION_CODEC = classOf[LZFCompressionCodec].getName } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 3a92c1a2c6d2b..859ae74c23835 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -19,8 +19,9 @@ package org.apache.spark.scheduler import org.json4s.jackson.JsonMethods._ -import org.apache.spark.util.{JsonProtocol, FileLogger} import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.{JsonProtocol, FileLogger} /** * A SparkListener that logs events to persistent storage. @@ -45,6 +46,14 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) private val logger = new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite) + // Information needed to replay the events logged by this listener later + val info = { + val compressionCodec = if (shouldCompress) { + Some(conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)) + } else None + EventLoggingInfo(logDir, compressionCodec) + } + logInfo("Logging events to %s".format(logDir)) /** Log the event as JSON */ @@ -77,3 +86,6 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) def stop() = logger.stop() } + +// If compression is not enabled, compressionCodec is None +private[spark] case class EventLoggingInfo(logDir: String, compressionCodec: Option[String]) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index f490d35ac6b78..395a160cc3f37 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -49,7 +49,7 @@ private[spark] class SparkDeploySchedulerBackend( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sparkHome, sc.ui.appUIAddress, sc.ui.eventLogDir) + sparkHome, sc.ui.appUIAddress, sc.ui.eventLogInfo) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index b8a5d2dfc3332..656e678cd0f7f 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -20,7 +20,7 @@ package org.apache.spark.ui import org.eclipse.jetty.server.{Handler, Server} import org.apache.spark.{SparkConf, Logging, SparkContext, SparkEnv} -import org.apache.spark.scheduler.{SparkReplayerBus, EventLoggingListener} +import org.apache.spark.scheduler.{SparkReplayerBus, EventLoggingListener, EventLoggingInfo} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.EnvironmentUI import org.apache.spark.ui.exec.ExecutorsUI @@ -77,8 +77,8 @@ private[spark] class SparkUI(val sc: SparkContext, conf: SparkConf, port: Int) e def setAppName(name: String) = appName = name - // Path to directory in which events are logged, if any - def eventLogDir: Option[String] = eventLogger.map { l => Some(l.logDir) }.getOrElse(None) + // Information needed to replay the events logged by this UI, if any + def eventLogInfo: Option[EventLoggingInfo] = eventLogger.map { l => Some(l.info) }.getOrElse(None) /** Bind the HTTP server which backs this web interface */ def bind() {