From d5154da443058f3d1adfafd1a449faff03ca5056 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 21 Mar 2014 23:28:06 -0700 Subject: [PATCH] Styling and comments --- .../spark/deploy/history/HistoryServer.scala | 37 +++++++++++++------ .../history/HistoryServerArguments.scala | 15 +++----- .../spark/deploy/history/IndexPage.scala | 9 ++--- .../spark/scheduler/ReplayListenerBus.scala | 2 +- .../scala/org/apache/spark/ui/WebUI.scala | 2 +- 5 files changed, 36 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index b36abbd9d4ff4..c789d9bad2b92 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -33,25 +33,28 @@ import org.apache.spark.util.Utils import org.apache.spark.scheduler.{ApplicationListener, ReplayListenerBus} /** - * A web server that re-renders SparkUIs of finished applications. + * A web server that renders SparkUIs of finished applications. * * For the standalone mode, MasterWebUI already achieves this functionality. Thus, the * main use case of the HistoryServer is in other deploy modes (e.g. Yarn or Mesos). * * The logging directory structure is as follows: Within the given base directory, each - * application's event logs are maintained in the application's own sub-directory. + * application's event logs are maintained in the application's own sub-directory. This + * is the same structure as maintained in the event log write code path in + * EventLoggingListener. * * @param baseLogDir The base directory in which event logs are found * @param requestedPort The requested port to which this server is to be bound */ -class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf) +class HistoryServer(val baseLogDir: String, requestedPort: Int) extends SparkUIContainer("History Server") with Logging { + private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir)) private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName()) private val port = requestedPort - private val indexPage = new IndexPage(this) - private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir)) + private val conf = new SparkConf private val securityManager = new SecurityManager(conf) + private val indexPage = new IndexPage(this) // A timestamp of when the disk was last accessed to check for log updates private var lastLogCheck = -1L @@ -84,8 +87,8 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf) * from the application's event logs, attaches this UI to itself, and stores metadata * information for this application. * - * If the logs for an existing finished application are no longer found, remove all - * associated information and detach the SparkUI. + * If the logs for an existing finished application are no longer found, the server + * removes all associated information and detaches the SparkUI. */ def checkForLogs() { if (logCheckReady) { @@ -137,7 +140,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf) if (success) { attachUI(ui) val appName = if (appListener.applicationStarted) appListener.appName else appId - ui.setAppName("%s (history)".format(appName)) + ui.setAppName("%s (finished)".format(appName)) val startTime = appListener.startTime val endTime = appListener.endTime val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui) @@ -155,7 +158,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf) /** Return the address of this server. */ def getAddress = "http://" + host + ":" + boundPort - /** Return when this directory is last modified. */ + /** Return when this directory was last modified. */ private def getModificationTime(dir: FileStatus): Long = { val logFiles = fileSystem.listStatus(dir.getPath) if (logFiles != null) { @@ -171,6 +174,16 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf) } } +/** + * The recommended way of starting and stopping a HistoryServer is through the scripts + * start-history-server.sh and stop-history-server.sh. The path to a base log directory + * is must be specified, while the requested UI port is optional. For example: + * + * ./sbin/spark-history-server.sh /tmp/spark-events 18080 + * ./sbin/spark-history-server.sh hdfs://1.2.3.4:9000/spark-events + * + * This launches the HistoryServer as a Spark daemon. + */ object HistoryServer { val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR @@ -178,13 +191,13 @@ object HistoryServer { val UPDATE_INTERVAL_SECONDS = 5 def main(argStrings: Array[String]) { - val conf = new SparkConf - val args = new HistoryServerArguments(argStrings, conf) - val server = new HistoryServer(args.logDir, args.port, conf) + val args = new HistoryServerArguments(argStrings) + val server = new HistoryServer(args.logDir, args.port) server.bind() // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } + server.stop() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index c142b18b94aea..d221ad32bb048 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -19,20 +19,20 @@ package org.apache.spark.deploy.history import java.net.URI -import org.apache.spark.SparkConf -import org.apache.spark.util.{Utils, IntParam} import org.apache.hadoop.fs.Path +import org.apache.spark.util.{IntParam, Utils} + /** * Command-line parser for the master. */ -private[spark] class HistoryServerArguments(args: Array[String], conf: SparkConf) { +private[spark] class HistoryServerArguments(args: Array[String]) { var port = 18080 var logDir = "" parse(args.toList) - def parse(args: List[String]): Unit = { + private def parse(args: List[String]): Unit = { args match { case ("--port" | "-p") :: IntParam(value) :: tail => port = value @@ -53,7 +53,7 @@ private[spark] class HistoryServerArguments(args: Array[String], conf: SparkConf validateLogDir() } - def validateLogDir() { + private def validateLogDir() { if (logDir == "") { System.err.println("Logging directory must be specified.") printUsageAndExit(1) @@ -66,10 +66,7 @@ private[spark] class HistoryServerArguments(args: Array[String], conf: SparkConf } } - /** - * Print usage and exit JVM with the given exit code. - */ - def printUsageAndExit(exitCode: Int) { + private def printUsageAndExit(exitCode: Int) { System.err.println( "Usage: HistoryServer [options]\n" + "\n" + diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala index 19713c75679ab..6a251978cf643 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy.history -import java.text.SimpleDateFormat -import java.util.Date import javax.servlet.http.HttpServletRequest import scala.xml.Node @@ -26,7 +24,6 @@ import scala.xml.Node import org.apache.spark.ui.{UIUtils, WebUI} private[spark] class IndexPage(parent: HistoryServer) { - private val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") def render(request: HttpServletRequest): Seq[Node] = { parent.checkForLogs() @@ -59,12 +56,12 @@ private[spark] class IndexPage(parent: HistoryServer) { private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { val appName = if (info.started) info.name else parent.getAppId(info.logPath) val uiAddress = parent.getAddress + info.ui.basePath - val startTime = if (info.started) dateFmt.format(new Date(info.startTime)) else "Not started" - val endTime = if (info.finished) dateFmt.format(new Date(info.endTime)) else "Not finished" + val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started" + val endTime = if (info.finished) WebUI.formatDate(info.endTime) else "Not finished" val difference = if (info.started && info.finished) info.endTime - info.startTime else -1L val duration = if (difference > 0) WebUI.formatDuration(difference) else "---" val logDirectory = parent.getAppId(info.logPath) - val lastUpdated = dateFmt.format(new Date(info.lastUpdated)) + val lastUpdated = WebUI.formatDate(info.lastUpdated) {appName} {startTime} diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 8c61e0742bc1a..d28f6cc05aea4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -130,7 +130,7 @@ private[spark] class ReplayListenerBus(logDir: String) extends SparkListenerBus /** If a compression codec is specified, wrap the given stream in a compression stream. */ private def wrapForCompression(stream: InputStream): InputStream = { - compressionCodec.map { codec => codec.compressedInputStream(stream) }.getOrElse(stream) + compressionCodec.map(_.compressedInputStream(stream)).getOrElse(stream) } /** Return a list of paths representing files found in the given directory. */ diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 586245abdedb5..2cc7582eca8a3 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -64,6 +64,6 @@ private[spark] object WebUI { return "%.0f min".format(minutes) } val hours = minutes / 60 - return "%.1f h".format(hours) + "%.1f h".format(hours) } }