Skip to content

Commit

Permalink
Address Thomas' comments
Browse files Browse the repository at this point in the history
The biggest changes here include:

(1) Periodically checking for event log updates in the background, instead
      of on refresh,
(2) Use a Long instead of a linearly scaling HashSet to keep track of the
      applications whose UIs are purposefully not rendered, and
(3) Adding Spark user as a new column. This includes adding a new field to
      the ApplicationStart event.
  • Loading branch information
andrewor14 committed Apr 9, 2014
1 parent 248cb3d commit 19e1fb4
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 80 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ class SparkContext(

/** Post the application start event */
private def postApplicationStart() {
listenerBus.post(SparkListenerApplicationStart(appName, startTime))
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
}

/**
Expand Down
169 changes: 113 additions & 56 deletions core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.deploy.history
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global

import org.apache.hadoop.fs.{FileStatus, Path}
import org.eclipse.jetty.servlet.ServletContextHandler
Expand Down Expand Up @@ -56,14 +54,44 @@ class HistoryServer(
import HistoryServer._

private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
private val localHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
private val port = requestedPort
private val securityManager = new SecurityManager(conf)
private val indexPage = new IndexPage(this)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheck = -1L
private var lastLogCheckTime = -1L

// If an application is last updated after this threshold, then its UI is retained
private var updateTimeThreshold = -1L

// Number of applications hidden from the UI because the application limit has been reached
private var numApplicationsHidden = 0

@volatile private var stopped = false

/**
* A background thread that periodically checks for event log updates on disk.
*
* If a log check is invoked manually in the middle of a period, this thread re-adjusts the
* time at which it performs the next log check to maintain the same period as before.
*/
private val logCheckingThread = new Thread {
override def run() {
while (!stopped) {
val now = System.currentTimeMillis
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
checkForLogs()
Thread.sleep(UPDATE_INTERVAL_MS)
} else {
// If the user has manually checked for logs recently, wait until
// UPDATE_INTERVAL_MS after the last check time
Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
}
}
}
}

private val handlers = Seq[ServletContextHandler](
createStaticHandler(STATIC_RESOURCE_DIR, "/static"),
Expand All @@ -74,13 +102,20 @@ class HistoryServer(
// A mapping of application ID to its history information, which includes the rendered UI
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()

// A set of recently removed applications that the server should avoid re-rendering
val appIdBlacklist = mutable.HashSet[String]()
/**
* Start the history server.
*
* This starts a background thread that periodically synchronizes information displayed on
* this UI with the event logs in the provided base directory.
*/
def start() {
logCheckingThread.start()
}

/** Bind to the HTTP server behind this web interface */
/** Bind to the HTTP server behind this web interface. */
override def bind() {
try {
serverInfo = Some(startJettyServer(bindHost, port, handlers, conf))
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
Expand All @@ -91,7 +126,8 @@ class HistoryServer(
}

/**
* Asynchronously check for any updates to event logs in the base directory.
* Check for any updates to event logs in the base directory. This is only effective once
* the server has been bound.
*
* If a new finished application is found, the server renders the associated SparkUI
* from the application's event logs, attaches this UI to itself, and stores metadata
Expand All @@ -100,41 +136,54 @@ class HistoryServer(
* If the logs for an existing finished application are no longer found, the server
* removes all associated information and detaches the SparkUI.
*/
def checkForLogs() {
if (logCheckReady) {
lastLogCheck = System.currentTimeMillis
val asyncCheck = future {
def checkForLogs() = synchronized {
if (serverInfo.isDefined) {
lastLogCheckTime = System.currentTimeMillis
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
try {
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()

// Forget about any SparkUIs that can no longer be found
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
val mostRecentAppIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
appIdToInfo.foreach { case (appId, info) =>
if (!appIds.contains(appId)) {
if (!mostRecentAppIds.contains(appId)) {
detachUI(info.ui)
appIdToInfo.remove(appId)
appIdBlacklist.clear()
updateTimeThreshold = -1L
}
}
appIdBlacklist.retain(appIds.contains)

// Keep track of the number of applications hidden from the UI this round
var _numApplicationsHidden = 0

// Render SparkUI for any new completed applications
logDirs.foreach { dir =>
val path = dir.getPath.toString
val appId = getAppId(path)
val lastUpdated = getModificationTime(dir)
if (!appIdToInfo.contains(appId) && !appIdBlacklist.contains(appId)) {
maybeRenderUI(appId, path, lastUpdated)
if (!appIdToInfo.contains(appId)) {
if (lastUpdated > updateTimeThreshold) {
maybeRenderUI(appId, path, lastUpdated)
} else {
// This application was previously blacklisted due to the application limit
_numApplicationsHidden += 1
}
}
// If the cap is reached, remove the least recently updated application
if (appIdToInfo.size > RETAINED_APPLICATIONS) {
removeOldestApp()
_numApplicationsHidden += 1
}
}

numApplicationsHidden = _numApplicationsHidden

} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
}
asyncCheck.onFailure { case t =>
logError("Unable to synchronize HistoryServer with files on disk: ", t)
}
} else {
logWarning("Attempted to check for event log updates before binding the server.")
}
}

Expand All @@ -161,11 +210,12 @@ class HistoryServer(
if (success && appListener.applicationStarted) {
attachUI(ui)
val appName = appListener.appName
ui.setAppName("%s (finished)".format(appName))
val sparkUser = appListener.sparkUser
val startTime = appListener.startTime
val endTime = appListener.endTime
val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui)
appIdToInfo(appId) = info
ui.setAppName("%s (finished)".format(appName))
appIdToInfo(appId) =
ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, sparkUser, logPath, ui)
} else {
logWarning("Reconstructing application UI was unsuccessful. Either no event logs were" +
"found or the event signaling application start is missing: %s".format(logPath))
Expand All @@ -175,9 +225,27 @@ class HistoryServer(
}
}

/**
* Remove the oldest application and detach its associated UI.
*
* As an optimization, record the last updated time of this application as the minimum
* update time threshold. Only applications with a last updated time that exceeds this
* threshold will be retained by the server. This avoids re-rendering an old application
* that is recently removed.
*/
private def removeOldestApp() {
val appToRemove = appIdToInfo.toSeq.minBy { case (_, info) => info.lastUpdated }
appToRemove match { case (id, info) =>
appIdToInfo.remove(id)
detachUI(info.ui)
updateTimeThreshold = info.lastUpdated
}
}

/** Stop the server and close the file system. */
override def stop() {
super.stop()
stopped = true
fileSystem.close()
}

Expand All @@ -187,39 +255,26 @@ class HistoryServer(
/** Return the address of this server. */
def getAddress: String = "http://" + publicHost + ":" + boundPort

/** Return the total number of application logs found, blacklisted or not. */
def getTotalApplications: Int = appIdToInfo.size + appIdBlacklist.size
/** Return the total number of application logs found, whether or not the UI is retained. */
def getTotalApplications: Int = appIdToInfo.size + numApplicationsHidden

/** Return when this directory was last modified. */
private def getModificationTime(dir: FileStatus): Long = {
val logFiles = fileSystem.listStatus(dir.getPath)
if (logFiles != null) {
logFiles.map(_.getModificationTime).max
} else {
dir.getModificationTime
}
}

/**
* Remove the oldest application and detach its associated UI. As an optimization, add the
* application to a blacklist to avoid re-rendering it the next time.
*/
private def removeOldestApp() {
val appToRemove = appIdToInfo.toSeq.minBy { case (_, info) => info.lastUpdated }
appToRemove match { case (id, info) =>
appIdToInfo.remove(id)
detachUI(info.ui)
appIdBlacklist.add(id)
try {
val logFiles = fileSystem.listStatus(dir.getPath)
if (logFiles != null) {
logFiles.map(_.getModificationTime).max
} else {
dir.getModificationTime
}
} catch {
case t: Throwable =>
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
-1L
}
}

/** Return whether the last log check has happened sufficiently long ago. */
private def logCheckReady: Boolean = {
System.currentTimeMillis - lastLogCheck > UPDATE_INTERVAL_SECONDS * 1000
}
}


/**
* The recommended way of starting and stopping a HistoryServer is through the scripts
* start-history-server.sh and stop-history-server.sh. The path to a base log directory
Expand All @@ -233,18 +288,19 @@ class HistoryServer(
object HistoryServer {
private val conf = new SparkConf

// Minimum interval between each check for logs, which requires a disk access (seconds)
private val UPDATE_INTERVAL_SECONDS = conf.getInt("spark.history.updateInterval", 5)
// Interval between each check for event log updates
val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000

// How many applications to retain
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 20)
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 250)

private val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR

def main(argStrings: Array[String]) {
val args = new HistoryServerArguments(argStrings)
val server = new HistoryServer(args.logDir, args.port, conf)
server.bind()
server.start()

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
Expand All @@ -258,6 +314,7 @@ private[spark] case class ApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
logPath: String,
ui: SparkUI) {
def started = startTime != -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,27 @@ import org.apache.spark.ui.{UIUtils, WebUI}
private[spark] class IndexPage(parent: HistoryServer) {

def render(request: HttpServletRequest): Seq[Node] = {
parent.checkForLogs()

// Populate app table, with most recently modified app first
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
val content =
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
<li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
<h4>
Showing {parent.appIdToInfo.size}/{parent.getTotalApplications} Finished Applications
</h4>
{appTable}
</ul>
{
if (parent.appIdToInfo.size > 0) {
<h4>
Showing {parent.appIdToInfo.size}/{parent.getTotalApplications}
Finished Application{if (parent.getTotalApplications > 1) "s" else ""}
</h4> ++
appTable
} else {
<h4>No Finished Applications Found</h4>
}
}
</div>
</div>

UIUtils.basicSparkPage(content, "History Server")
}

Expand All @@ -52,6 +55,7 @@ private[spark] class IndexPage(parent: HistoryServer) {
"Started",
"Finished",
"Duration",
"Spark User",
"Log Directory",
"Last Updated")

Expand All @@ -62,13 +66,15 @@ private[spark] class IndexPage(parent: HistoryServer) {
val endTime = if (info.finished) WebUI.formatDate(info.endTime) else "Not finished"
val difference = if (info.started && info.finished) info.endTime - info.startTime else -1L
val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
val logDirectory = parent.getAppId(info.logPath)
val lastUpdated = WebUI.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{appName}</a></td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
<td>{sparkUser}</td>
<td>{logDirectory}</td>
<td>{lastUpdated}</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package org.apache.spark.scheduler
*/
private[spark] class ApplicationEventListener extends SparkListener {
var appName = "<Not Started>"
var sparkUser = "<Not Started>"
var startTime = -1L
var endTime = -1L

Expand All @@ -41,6 +42,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = applicationStart.appName
startTime = applicationStart.time
sparkUser = applicationStart.sparkUser
}

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
Expand Down
Loading

0 comments on commit 19e1fb4

Please sign in to comment.