Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2750] support https in spark web ui #1980

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ private[deploy] object DeployMessages {
port: Int,
cores: Int,
memory: Int,
webUiPort: Int,
publicAddress: String)
workerWebUiUrl: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private[spark] class Master(
System.exit(0)
}

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
Expand All @@ -201,7 +201,7 @@ private[spark] class Master(
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerUiPort, publicAddress)
sender, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val actor: ActorRef,
val webUiPort: Int,
val publicAddress: String)
val webUiAddress: String)
extends Serializable {

Utils.checkHost(host, "Expected hostname")
Expand Down Expand Up @@ -99,10 +98,6 @@ private[spark] class WorkerInfo(
coresUsed -= driver.desc.cores
}

def webUiAddress : String = {
"http://" + this.publicAddress + ":" + this.webUiPort
}

def setState(state: WorkerState.Value) = {
this.state = state
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private[spark] class Worker(
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName)
var workerWebUiUrl: String = ""
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
Expand Down Expand Up @@ -139,6 +140,7 @@ private[spark] class Worker(
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = "http://" + publicAddress + ":" + webUi.boundPort
registerWithMaster()

metricsSystem.registerSource(workerSource)
Expand All @@ -162,7 +164,7 @@ private[spark] class Worker(
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl)
}
}

Expand Down
98 changes: 94 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@

package org.apache.spark.ui

import java.net.{InetSocketAddress, URL}
import java.net.URL
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import scala.collection.mutable.StringBuilder
import scala.language.implicitConversions
import scala.xml.Node

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.{Request, Connector, Server}
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.eclipse.jetty.server.nio.SelectChannelConnector
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector
import org.eclipse.jetty.http.HttpStatus
import org.eclipse.jetty.util.ssl.SslContextFactory

import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}

Expand Down Expand Up @@ -180,12 +186,32 @@ private[spark] object JettyUtils extends Logging {
serverName: String = ""): ServerInfo = {

val collection = new ContextHandlerCollection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we used to bind Jetty to a specific hostname / interface, but it doesn't look like the new code uses hostName anywhere. To preserve the same behavior, maybe we should call connector.setHost() after connector.setPort() in the code below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here should only setHost(), port has been set in getConnector(currentPort, conf)

collection.setHandlers(handlers.toArray)
addFilters(handlers, conf)

// Bind to the given port, or throw a java.net.BindException if the port is occupied
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(hostName, currentPort))
val server = new Server
// Create a connector on port currentPort to listen for HTTP requests
val httpConnector = new SelectChannelConnector()
httpConnector.setPort(currentPort)
httpConnector.setHost(hostName)

if (conf.get("spark.ui.https.enabled", "false").toBoolean) {
// / If the new port wraps around, do not try a privilege port
val securePort = (currentPort + 1 - 1024) % (65536 - 1024) + 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this port isn't free? It's dangerous to make assumptions about whether a particular ports is free, especially since this can cause problems in Jenkins, where we might have multiple builds running concurrently on the same machine that are contending for ports.

Could we use Utils.startServiceOnPort for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, note connect will be passed to Utils.startServiceOnPort, so in Utils.startServiceOnPort it will try to start http service on currentPort and https service on securePort. If securePort is not free(or currentPort is not free), it will try again until the maximum number of retries

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thanks for explaining.

val scheme = "https"
// Create a connector on port securePort to listen for HTTPS requests
val connector = buildSslSelectChannelConnector(securePort, conf)
connector.setHost(hostName)
server.setConnectors(Seq(httpConnector,connector).toArray)

// redirect the HTTP requests to HTTPS port
val newHandlers = Seq(createRedirectHttpsHandler(securePort, scheme)) ++ handlers
collection.setHandlers(newHandlers.toArray)
} else {
server.addConnector(httpConnector)
collection.setHandlers(handlers.toArray)
}
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
Expand All @@ -205,10 +231,74 @@ private[spark] object JettyUtils extends Logging {
ServerInfo(server, boundPort, collection)
}

// to generate a new url string scheme://server:port+path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @scwf,

The reason I asked for a comment is that it seems like the method is doing a little more than just that. For example, L238 seems to be doing some sort of parsing of the server string, so it's more than just concatenating the different arguments into a URL.

It would be nice if the comment explained exactly what the relationship between the input and the output is. A unit test wouldn't hurt either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vanzin, actually here i just refer to the code of jetty 9 see(https://github.com/eclipse/jetty.project/blob/master/jetty-util/src/main/java/org/eclipse/jetty/util/URIUtil.java#L726-L733) since there is no newURImethod in spark jetty version(spark use jetty 8).

And L238 is for the case to handle IPv6 address, here we can remove it if unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put that reference in the code itself, so we know where it comes from? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, updated.

private def newURL(scheme: String, server: String, port: Int, path: String, query: String) = {
val builder = new StringBuilder

if (server.indexOf(':') >= 0 && server.charAt(0) != '[') {
builder.append(scheme).append("://").append('[').append(server).append(']')
} else {
builder.append(scheme).append("://").append(server)
}
builder.append(':').append(port)
builder.append(path)
if (query != null && query.length > 0) builder.append('?').append(query)
builder.toString
}

private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = {
val redirectHandler: ContextHandler = new ContextHandler
redirectHandler.setContextPath("/")
redirectHandler.setHandler(new AbstractHandler {
override def handle(
target: String,
baseRequest: Request,
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
if (baseRequest.isSecure) {
return
}
val httpsURL = newURL(scheme, baseRequest.getServerName, securePort,
baseRequest.getRequestURI, baseRequest.getQueryString)
response.setContentLength(0)
response.encodeRedirectURL(httpsURL)
response.sendRedirect(httpsURL)
baseRequest.setHandled(true)
}
})
redirectHandler
}

/** Attach a prefix to the given path, but avoid returning an empty path */
private def attachPrefix(basePath: String, relativePath: String): String = {
if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/")
}

private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = {
val ctxFactory = new SslContextFactory()
conf.getAll
.filter { case (k, v) => k.startsWith("spark.ui.ssl.") }
.foreach { case (k, v) => setSslContextFactoryProps(k, v, ctxFactory) }

val connector = new SslSelectChannelConnector(ctxFactory)
connector.setPort(port)
connector
}

private def setSslContextFactoryProps(
key: String, value: String, ctxFactory: SslContextFactory) = {
key match {
case "spark.ui.ssl.client.https.needAuth" => ctxFactory.setNeedClientAuth(value.toBoolean)
case "spark.ui.ssl.server.keystore.keypassword" => ctxFactory.setKeyManagerPassword(value)
case "spark.ui.ssl.server.keystore.location" => ctxFactory.setKeyStorePath(value)
case "spark.ui.ssl.server.keystore.password" => ctxFactory.setKeyStorePassword(value)
case "spark.ui.ssl.server.keystore.type" => ctxFactory.setKeyStoreType(value)
case "spark.ui.ssl.server.truststore.location" => ctxFactory.setTrustStore(value)
case "spark.ui.ssl.server.truststore.password" => ctxFactory.setTrustStorePassword(value)
case "spark.ui.ssl.server.truststore.type" => ctxFactory.setTrustStoreType(value)
}
}

}

private[spark] case class ServerInfo(
Expand Down
Binary file added core/src/test/resources/spark.keystore
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class JsonProtocolSuite extends FunSuite {
createDriverDesc(), new Date())

def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80")
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}
Expand Down
43 changes: 43 additions & 0 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,30 @@ class UISuite extends FunSuite {
server.close()
}

test("jetty with https selects different port under contention") {
val server = new ServerSocket(0)
val startPort = server.getLocalPort

val sparkConf = new SparkConf()
.set("spark.ui.https.enabled", "true")
.set("spark.ui.ssl.server.keystore.location", "./src/test/resources/spark.keystore")
.set("spark.ui.ssl.server.keystore.password", "123456")
.set("spark.ui.ssl.server.keystore.keypassword", "123456")
val serverInfo1 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server1")
val serverInfo2 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server2")
// Allow some wiggle room in case ports on the machine are under contention
val boundPort1 = serverInfo1.boundPort
val boundPort2 = serverInfo2.boundPort
assert(boundPort1 != startPort)
assert(boundPort2 != startPort)
assert(boundPort1 != boundPort2)
serverInfo1.server.stop()
serverInfo2.server.stop()
server.close()
}

test("jetty binds to port 0 correctly") {
val serverInfo = JettyUtils.startJettyServer(
"0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf)
Expand All @@ -137,6 +161,25 @@ class UISuite extends FunSuite {
}
}

test("jetty with https binds to port 0 correctly") {
val sparkConf = new SparkConf()
.set("spark.ui.https.enabled", "false")
.set("spark.ui.ssl.server.keystore.location", "./src/test/resources/spark.keystore")
.set("spark.ui.ssl.server.keystore.password", "123456")
.set("spark.ui.ssl.server.keystore.keypassword", "123456")
val serverInfo = JettyUtils.startJettyServer(
"0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf)
val server = serverInfo.server
val boundPort = serverInfo.boundPort
assert(server.getState === "STARTED")
assert(boundPort != 0)
Try { new ServerSocket(boundPort) } match {
case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
case Failure(e) =>
}
serverInfo.server.stop()
}

test("verify appUIAddress contains the scheme") {
withSpark(newSparkContext()) { sc =>
val ui = sc.ui.get
Expand Down
59 changes: 58 additions & 1 deletion docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,69 @@ Spark currently supports authentication via a shared secret. Authentication can

## Web UI

The Spark UI can also be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters.
The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting and by using [Jetty https/SSL](http://www.eclipse.org/jetty/documentation/current/configuring-ssl.html) via the `spark.ui.https.enabled` setting.

### Authentication

A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters.

Spark also supports modify ACLs to control who has access to modify a running Spark application. This includes things like killing the application or a task. This is controlled by the configs `spark.acls.enable` and `spark.modify.acls`. Note that if you are authenticating the web UI, in order to use the kill button on the web UI it might be necessary to add the users in the modify acls to the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces.

Spark allows for a set of administrators to be specified in the acls who always have view and modify permissions to all the applications. is controlled by the config `spark.admin.acls`. This is useful on a shared cluster where you might have administrators or support staff who help users debug applications.

### Encryption

Spark use SSL(Secure Sockets Layer) to establish an encrypted link between UI server and browser client. The config `spark.ui.https.enabled` open switch for encryption, other configs of SSL encryption is as follows

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td>spark.ui.https.enabled</td>
<td>false</td>
<td>Whether to enable https in web ui.</td>
</tr>
<tr>
<td>spark.ui.ssl.server.keystore.keypassword</td>
<td>(none)</td>
<td>The password for the specific key within the key store.</td>
</tr>
<tr>
<td>spark.ui.ssl.server.keystore.location</td>
<td>(none)</td>
<td>The file or URL of the SSL Key store.</td>
</tr>
<tr>
<td>spark.ui.ssl.server.keystore.password</td>
<td>(none)</td>
<td>The password for the key store.</td>
</tr>
<tr>
<td>spark.ui.ssl.server.keystore.type</td>
<td>JKS</td>
<td>The type of the key store (default "JKS").</td>
</tr>
<tr>
<td>spark.ui.ssl.client.https.needAuth</td>
<td>(none)</td>
<td>Set true if SSL needs client authentication.</td>
</tr>
<tr>
<td>spark.ui.ssl.server.truststore.location</td>
<td>(none)</td>
<td>The file name or URL of the trust store location.</td>
</tr>
<tr>
<td>spark.ui.ssl.server.truststore.password</td>
<td>(none)</td>
<td>The password for the trust store</td>
</tr>
<tr>
<td>spark.ui.ssl.server.truststore.type</td>
<td>JKS</td>
<td>The type of the trust store (default "JKS")</td>
</tr>
</table>

## Event Logging

If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access.
Expand Down