diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 48ca6cfe05939..fe46bc43cec01 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -17,13 +17,11 @@ package org.apache.spark.deploy.history -import java.net.URI import javax.servlet.http.HttpServletRequest import scala.collection.mutable import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global -import scala.util.{Failure, Success} import org.apache.hadoop.fs.{FileStatus, Path} import org.eclipse.jetty.servlet.ServletContextHandler @@ -52,8 +50,9 @@ import org.apache.spark.scheduler.{ApplicationEventListener, ReplayListenerBus} class HistoryServer(val baseLogDir: String, requestedPort: Int) extends SparkUIContainer("History Server") with Logging { - private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir)) - private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName()) + private val fileSystem = Utils.getHadoopFileSystem(baseLogDir) + private val bindHost = Utils.localHostName() + private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) private val port = requestedPort private val conf = new SparkConf private val securityManager = new SecurityManager(conf) @@ -73,8 +72,8 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int) /** Bind to the HTTP server behind this web interface */ override def bind() { try { - serverInfo = Some(startJettyServer(host, port, handlers, conf)) - logInfo("Started HistoryServer at http://%s:%d".format(host, boundPort)) + serverInfo = Some(startJettyServer(bindHost, port, handlers, conf)) + logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort)) } catch { case e: Exception => logError("Failed to bind HistoryServer", e) @@ -133,7 +132,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int) * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing. */ private def maybeRenderUI(appId: String, logPath: String, lastUpdated: Long) { - val replayBus = new ReplayListenerBus(logPath) + val replayBus = new ReplayListenerBus(logPath, fileSystem) replayBus.start() // If the application completion file is found @@ -157,14 +156,19 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int) } else { logWarning("Skipping incomplete application: %s".format(logPath)) } - replayBus.stop() + } + + /** Stop the server and close the file system. */ + override def stop() { + super.stop() + fileSystem.close() } /** Parse app ID from the given log path. */ def getAppId(logPath: String): String = logPath.split("/").last /** Return the address of this server. */ - def getAddress = "http://" + host + ":" + boundPort + def getAddress = "http://" + publicHost + ":" + boundPort /** Return when this directory was last modified. */ private def getModificationTime(dir: FileStatus): Long = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f22f0452108a2..f1cd2857fd1b9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -29,6 +29,7 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension +import org.apache.hadoop.fs.FileSystem import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState} @@ -45,7 +46,8 @@ private[spark] class Master( host: String, port: Int, webUiPort: Int, - val securityMgr: SecurityManager) extends Actor with Logging { + val securityMgr: SecurityManager) + extends Actor with Logging { import context.dispatcher // to use Akka's scheduler.schedule() @@ -71,6 +73,7 @@ private[spark] class Master( var nextAppNumber = 0 val appIdToUI = new HashMap[String, SparkUI] + val fileSystems = new HashSet[FileSystem] val drivers = new HashSet[DriverInfo] val completedDrivers = new ArrayBuffer[DriverInfo] @@ -149,6 +152,7 @@ private[spark] class Master( override def postStop() { webUi.stop() + fileSystems.foreach(_.close()) masterMetricsSystem.stop() applicationMetricsSystem.stop() persistenceEngine.close() @@ -662,11 +666,11 @@ private[spark] class Master( val eventLogDir = app.desc.eventLogDir.getOrElse { return None } val replayBus = new ReplayListenerBus(eventLogDir) val ui = new SparkUI(replayBus, "%s (finished)".format(appName), "/history/%s".format(app.id)) + fileSystems += replayBus.fileSystem // Do not call ui.bind() to avoid creating a new server for each application ui.start() val success = replayBus.replay() - replayBus.stop() if (success) Some(ui) else None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index c39482f6fbec1..06ff32d2a0ef4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler import java.io.InputStream -import java.net.URI import scala.io.Source @@ -36,8 +35,13 @@ import org.apache.spark.util.{JsonProtocol, Utils} * This class expects files to be appropriately prefixed as specified in EventLoggingListener. * There exists a one-to-one mapping between ReplayListenerBus and event logging applications. */ -private[spark] class ReplayListenerBus(logDir: String) extends SparkListenerBus with Logging { - private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) +private[spark] class ReplayListenerBus( + logDir: String, + val fileSystem: FileSystem) + extends SparkListenerBus with Logging { + + def this(logDir: String) = this(logDir, Utils.getHadoopFileSystem(logDir)) + private var applicationComplete = false private var compressionCodec: Option[CompressionCodec] = None private var logPaths = Array[Path]() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4435b21a7505e..105dd222a4e48 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -26,7 +26,6 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.SortedSet import scala.io.Source import scala.reflect.ClassTag @@ -1022,4 +1021,11 @@ private[spark] object Utils extends Logging { def getHadoopFileSystem(path: URI): FileSystem = { FileSystem.get(path, SparkHadoopUtil.get.newConfiguration()) } + + /** + * Return a Hadoop FileSystem with the scheme encoded in the given path. + */ + def getHadoopFileSystem(path: String): FileSystem = { + getHadoopFileSystem(new URI(path)) + } }