diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index a7368f9f3dfbe..964ede01922b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -39,8 +39,7 @@ private[deploy] object DeployMessages { port: Int, cores: Int, memory: Int, - webUiPort: Int, - publicAddress: String) + workerWebUiUrl: String) extends DeployMessage { Utils.checkHost(host, "Required hostname") assert (port > 0) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f98b531316a3d..362518c31c15c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -191,7 +191,7 @@ private[spark] class Master( System.exit(0) } - case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => + case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) @@ -201,7 +201,7 @@ private[spark] class Master( sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - sender, workerUiPort, publicAddress) + sender, workerWebUiUrl) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index c5fa9cf7d7c2d..f775d0d783e0f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -30,8 +30,7 @@ private[spark] class WorkerInfo( val cores: Int, val memory: Int, val actor: ActorRef, - val webUiPort: Int, - val publicAddress: String) + val webUiAddress: String) extends Serializable { Utils.checkHost(host, "Expected hostname") @@ -99,10 +98,6 @@ private[spark] class WorkerInfo( coresUsed -= driver.desc.cores } - def webUiAddress : String = { - "http://" + this.publicAddress + ":" + this.webUiPort - } - def setState(state: WorkerState.Value) = { this.state = state } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 9b52cb06fb6fa..81cdd7a09d1d1 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -79,6 +79,7 @@ private[spark] class Worker( var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) + var workerWebUiUrl: String = "" @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -139,6 +140,7 @@ private[spark] class Worker( context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() + workerWebUiUrl = "http://" + publicAddress + ":" + webUi.boundPort registerWithMaster() metricsSystem.registerSource(workerSource) @@ -162,7 +164,7 @@ private[spark] class Worker( for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) - actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl) } } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 2a27d49d2de05..371002abe4c5b 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -17,17 +17,23 @@ package org.apache.spark.ui -import java.net.{InetSocketAddress, URL} +import java.net.URL import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} +import scala.collection.mutable.StringBuilder import scala.language.implicitConversions import scala.xml.Node -import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.{Request, Connector, Server} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet._ import org.eclipse.jetty.util.thread.QueuedThreadPool +import org.eclipse.jetty.server.nio.SelectChannelConnector +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector +import org.eclipse.jetty.http.HttpStatus +import org.eclipse.jetty.util.ssl.SslContextFactory + import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} @@ -180,12 +186,32 @@ private[spark] object JettyUtils extends Logging { serverName: String = ""): ServerInfo = { val collection = new ContextHandlerCollection - collection.setHandlers(handlers.toArray) addFilters(handlers, conf) // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { - val server = new Server(new InetSocketAddress(hostName, currentPort)) + val server = new Server + // Create a connector on port currentPort to listen for HTTP requests + val httpConnector = new SelectChannelConnector() + httpConnector.setPort(currentPort) + httpConnector.setHost(hostName) + + if (conf.get("spark.ui.https.enabled", "false").toBoolean) { + // / If the new port wraps around, do not try a privilege port + val securePort = (currentPort + 1 - 1024) % (65536 - 1024) + 1024 + val scheme = "https" + // Create a connector on port securePort to listen for HTTPS requests + val connector = buildSslSelectChannelConnector(securePort, conf) + connector.setHost(hostName) + server.setConnectors(Seq(httpConnector,connector).toArray) + + // redirect the HTTP requests to HTTPS port + val newHandlers = Seq(createRedirectHttpsHandler(securePort, scheme)) ++ handlers + collection.setHandlers(newHandlers.toArray) + } else { + server.addConnector(httpConnector) + collection.setHandlers(handlers.toArray) + } val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) @@ -205,10 +231,76 @@ private[spark] object JettyUtils extends Logging { ServerInfo(server, boundPort, collection) } + // Create a new URI from the arguments, handling IPv6 host encoding and default ports. Based on: + // https://github.com/eclipse/jetty.project/blob/master/jetty-util/src/main/java/org/eclipse/ + // jetty/util/URIUtil.java#L726-L733 + private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = { + val builder = new StringBuilder + + if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { + builder.append(scheme).append("://").append('[').append(server).append(']') + } else { + builder.append(scheme).append("://").append(server) + } + builder.append(':').append(port) + builder.append(path) + if (query != null && query.length > 0) builder.append('?').append(query) + builder.toString + } + + private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = { + val redirectHandler: ContextHandler = new ContextHandler + redirectHandler.setContextPath("/") + redirectHandler.setHandler(new AbstractHandler { + override def handle( + target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse): Unit = { + if (baseRequest.isSecure) { + return + } + val httpsURI = newURI(scheme, baseRequest.getServerName, securePort, + baseRequest.getRequestURI, baseRequest.getQueryString) + response.setContentLength(0) + response.encodeRedirectURL(httpsURI) + response.sendRedirect(httpsURI) + baseRequest.setHandled(true) + } + }) + redirectHandler + } + /** Attach a prefix to the given path, but avoid returning an empty path */ private def attachPrefix(basePath: String, relativePath: String): String = { if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") } + + private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = { + val ctxFactory = new SslContextFactory() + conf.getAll + .filter { case (k, v) => k.startsWith("spark.ui.ssl.") } + .foreach { case (k, v) => setSslContextFactoryProps(k, v, ctxFactory) } + + val connector = new SslSelectChannelConnector(ctxFactory) + connector.setPort(port) + connector + } + + private def setSslContextFactoryProps( + key: String, value: String, ctxFactory: SslContextFactory) = { + key match { + case "spark.ui.ssl.client.https.needAuth" => ctxFactory.setNeedClientAuth(value.toBoolean) + case "spark.ui.ssl.server.keystore.keypassword" => ctxFactory.setKeyManagerPassword(value) + case "spark.ui.ssl.server.keystore.location" => ctxFactory.setKeyStorePath(value) + case "spark.ui.ssl.server.keystore.password" => ctxFactory.setKeyStorePassword(value) + case "spark.ui.ssl.server.keystore.type" => ctxFactory.setKeyStoreType(value) + case "spark.ui.ssl.server.truststore.location" => ctxFactory.setTrustStore(value) + case "spark.ui.ssl.server.truststore.password" => ctxFactory.setTrustStorePassword(value) + case "spark.ui.ssl.server.truststore.type" => ctxFactory.setTrustStoreType(value) + } + } + } private[spark] case class ServerInfo( diff --git a/core/src/test/resources/spark.keystore b/core/src/test/resources/spark.keystore new file mode 100644 index 0000000000000..f30716b57b302 Binary files /dev/null and b/core/src/test/resources/spark.keystore differ diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 3f1cd0752e766..429daa5957aeb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -111,7 +111,7 @@ class JsonProtocolSuite extends FunSuite { createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80") workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 92a21f82f3c21..66d1237605a5c 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -124,6 +124,30 @@ class UISuite extends FunSuite { server.close() } + test("jetty with https selects different port under contention") { + val server = new ServerSocket(0) + val startPort = server.getLocalPort + + val sparkConf = new SparkConf() + .set("spark.ui.https.enabled", "true") + .set("spark.ui.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + .set("spark.ui.ssl.server.keystore.password", "123456") + .set("spark.ui.ssl.server.keystore.keypassword", "123456") + val serverInfo1 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server1") + val serverInfo2 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server2") + // Allow some wiggle room in case ports on the machine are under contention + val boundPort1 = serverInfo1.boundPort + val boundPort2 = serverInfo2.boundPort + assert(boundPort1 != startPort) + assert(boundPort2 != startPort) + assert(boundPort1 != boundPort2) + serverInfo1.server.stop() + serverInfo2.server.stop() + server.close() + } + test("jetty binds to port 0 correctly") { val serverInfo = JettyUtils.startJettyServer( "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf) @@ -137,6 +161,25 @@ class UISuite extends FunSuite { } } + test("jetty with https binds to port 0 correctly") { + val sparkConf = new SparkConf() + .set("spark.ui.https.enabled", "false") + .set("spark.ui.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + .set("spark.ui.ssl.server.keystore.password", "123456") + .set("spark.ui.ssl.server.keystore.keypassword", "123456") + val serverInfo = JettyUtils.startJettyServer( + "0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf) + val server = serverInfo.server + val boundPort = serverInfo.boundPort + assert(server.getState === "STARTED") + assert(boundPort != 0) + Try { new ServerSocket(boundPort) } match { + case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort)) + case Failure(e) => + } + serverInfo.server.stop() + } + test("verify appUIAddress contains the scheme") { withSpark(newSparkContext()) { sc => val ui = sc.ui.get diff --git a/docs/security.md b/docs/security.md index ec0523184d665..83198d4ec0496 100644 --- a/docs/security.md +++ b/docs/security.md @@ -11,12 +11,69 @@ Spark currently supports authentication via a shared secret. Authentication can ## Web UI -The Spark UI can also be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. +The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting and by using [Jetty https/SSL](http://www.eclipse.org/jetty/documentation/current/configuring-ssl.html) via the `spark.ui.https.enabled` setting. + +### Authentication + +A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. Spark also supports modify ACLs to control who has access to modify a running Spark application. This includes things like killing the application or a task. This is controlled by the configs `spark.acls.enable` and `spark.modify.acls`. Note that if you are authenticating the web UI, in order to use the kill button on the web UI it might be necessary to add the users in the modify acls to the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. Spark allows for a set of administrators to be specified in the acls who always have view and modify permissions to all the applications. is controlled by the config `spark.admin.acls`. This is useful on a shared cluster where you might have administrators or support staff who help users debug applications. +### Encryption + +Spark use SSL(Secure Sockets Layer) to establish an encrypted link between UI server and browser client. The config `spark.ui.https.enabled` open switch for encryption, other configs of SSL encryption is as follows + +
Property Name | Default | Meaning |
---|---|---|
spark.ui.https.enabled | +false | +Whether to enable https in web ui. | +
spark.ui.ssl.server.keystore.keypassword | +(none) | +The password for the specific key within the key store. | +
spark.ui.ssl.server.keystore.location | +(none) | +The file or URL of the SSL Key store. | +
spark.ui.ssl.server.keystore.password | +(none) | +The password for the key store. | +
spark.ui.ssl.server.keystore.type | +JKS | +The type of the key store (default "JKS"). | +
spark.ui.ssl.client.https.needAuth | +(none) | +Set true if SSL needs client authentication. | +
spark.ui.ssl.server.truststore.location | +(none) | +The file name or URL of the trust store location. | +
spark.ui.ssl.server.truststore.password | +(none) | +The password for the trust store | +
spark.ui.ssl.server.truststore.type | +JKS | +The type of the trust store (default "JKS") | +