Skip to content

Commit

Permalink
Do not close file system with ReplayBus + fix bind address
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 7, 2014
1 parent bc46fc8 commit a3598de
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.spark.deploy.history

import java.net.URI
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

import org.apache.hadoop.fs.{FileStatus, Path}
import org.eclipse.jetty.servlet.ServletContextHandler
Expand Down Expand Up @@ -52,8 +50,9 @@ import org.apache.spark.scheduler.{ApplicationEventListener, ReplayListenerBus}
class HistoryServer(val baseLogDir: String, requestedPort: Int)
extends SparkUIContainer("History Server") with Logging {

private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir))
private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
private val port = requestedPort
private val conf = new SparkConf
private val securityManager = new SecurityManager(conf)
Expand All @@ -73,8 +72,8 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
/** Bind to the HTTP server behind this web interface */
override def bind() {
try {
serverInfo = Some(startJettyServer(host, port, handlers, conf))
logInfo("Started HistoryServer at http://%s:%d".format(host, boundPort))
serverInfo = Some(startJettyServer(bindHost, port, handlers, conf))
logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
logError("Failed to bind HistoryServer", e)
Expand Down Expand Up @@ -133,7 +132,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
* which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
*/
private def maybeRenderUI(appId: String, logPath: String, lastUpdated: Long) {
val replayBus = new ReplayListenerBus(logPath)
val replayBus = new ReplayListenerBus(logPath, fileSystem)
replayBus.start()

// If the application completion file is found
Expand All @@ -157,14 +156,19 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
} else {
logWarning("Skipping incomplete application: %s".format(logPath))
}
replayBus.stop()
}

/** Stop the server and close the file system. */
override def stop() {
super.stop()
fileSystem.close()
}

/** Parse app ID from the given log path. */
def getAppId(logPath: String): String = logPath.split("/").last

/** Return the address of this server. */
def getAddress = "http://" + host + ":" + boundPort
def getAddress = "http://" + publicHost + ":" + boundPort

/** Return when this directory was last modified. */
private def getModificationTime(dir: FileStatus): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
import org.apache.hadoop.fs.FileSystem

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
Expand All @@ -45,7 +46,8 @@ private[spark] class Master(
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager) extends Actor with Logging {
val securityMgr: SecurityManager)
extends Actor with Logging {

import context.dispatcher // to use Akka's scheduler.schedule()

Expand All @@ -71,6 +73,7 @@ private[spark] class Master(
var nextAppNumber = 0

val appIdToUI = new HashMap[String, SparkUI]
val fileSystems = new HashSet[FileSystem]

val drivers = new HashSet[DriverInfo]
val completedDrivers = new ArrayBuffer[DriverInfo]
Expand Down Expand Up @@ -149,6 +152,7 @@ private[spark] class Master(

override def postStop() {
webUi.stop()
fileSystems.foreach(_.close())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
Expand Down Expand Up @@ -662,11 +666,11 @@ private[spark] class Master(
val eventLogDir = app.desc.eventLogDir.getOrElse { return None }
val replayBus = new ReplayListenerBus(eventLogDir)
val ui = new SparkUI(replayBus, "%s (finished)".format(appName), "/history/%s".format(app.id))
fileSystems += replayBus.fileSystem

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
val success = replayBus.replay()
replayBus.stop()
if (success) Some(ui) else None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.scheduler

import java.io.InputStream
import java.net.URI

import scala.io.Source

Expand All @@ -36,8 +35,13 @@ import org.apache.spark.util.{JsonProtocol, Utils}
* This class expects files to be appropriately prefixed as specified in EventLoggingListener.
* There exists a one-to-one mapping between ReplayListenerBus and event logging applications.
*/
private[spark] class ReplayListenerBus(logDir: String) extends SparkListenerBus with Logging {
private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
private[spark] class ReplayListenerBus(
logDir: String,
val fileSystem: FileSystem)
extends SparkListenerBus with Logging {

def this(logDir: String) = this(logDir, Utils.getHadoopFileSystem(logDir))

private var applicationComplete = false
private var compressionCodec: Option[CompressionCodec] = None
private var logPaths = Array[Path]()
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.SortedSet
import scala.io.Source
import scala.reflect.ClassTag

Expand Down Expand Up @@ -1022,4 +1021,11 @@ private[spark] object Utils extends Logging {
def getHadoopFileSystem(path: URI): FileSystem = {
FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
}

/**
* Return a Hadoop FileSystem with the scheme encoded in the given path.
*/
def getHadoopFileSystem(path: String): FileSystem = {
getHadoopFileSystem(new URI(path))
}
}

0 comments on commit a3598de

Please sign in to comment.