Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2750][WEB UI]Add Https support for Web UI #5664

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e8256e5
support https in spark web ui
scwf Aug 16, 2014
8f7cc96
add unit test
scwf Aug 16, 2014
c90d84e
fix according to comments
scwf Sep 21, 2014
de8d1bd
fix scalastyle
scwf Sep 21, 2014
35074fd
fix workerinfo in JsonProtocolSuite
scwf Sep 21, 2014
967950f
Merge branch 'master' of github.com:apache/spark into https
scwf Sep 21, 2014
9591c9c
import org.eclipse.jetty.server.Server to fix test error
scwf Sep 21, 2014
333334a
fix comments
scwf Sep 25, 2014
64d7dc0
add redirect from http to https
scwf Oct 1, 2014
89bf986
revert debug code
scwf Oct 1, 2014
8e5132d
merge with apache/master
scwf Oct 1, 2014
677b746
add https/ssl to docs
scwf Oct 1, 2014
a4ce923
fix docs
scwf Oct 1, 2014
6c31dc7
fix code format
scwf Oct 1, 2014
7a898fb
fix securePort
scwf Oct 1, 2014
a29ec86
fix conflict with apache/master
scwf Oct 1, 2014
55bbc5f
merge with apache/master and fix conflict
scwf Oct 3, 2014
e5c87cb
fix comments by JoshRosen
scwf Oct 3, 2014
baaa1ce
fix conflict
scwf Oct 4, 2014
a48c6fc
address JoshRosen's comments
scwf Oct 5, 2014
2dadb2f
address vanzin's comments
scwf Oct 7, 2014
8b32853
Merge branch 'master' of https://github.com/apache/spark into https
scwf Oct 7, 2014
3b01d3a
add reference to method newURI
scwf Oct 10, 2014
4ae834b
Merge branch 'master' of https://github.com/apache/spark into https
scwf Oct 10, 2014
d80f7e9
rebase based on https://github.com/apache/spark/pull/1980
WangTaoTheTonic Apr 23, 2015
9deebf3
rewrite using SSLOptions
WangTaoTheTonic Apr 25, 2015
7def14e
fix uisuites
WangTaoTheTonic Apr 25, 2015
18982b4
use spark.ssl.ui.* instead, update docs, fixes in Suite and other min…
WangTaoTheTonic May 4, 2015
dfbe1d6
per Marcelo's comments
WangTaoTheTonic May 5, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param keyPassword a password to access the private key in the key-store
* @param keyStoreType the type of the key-store
* @param needClientAuth set true if SSL needs client authentication
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
* @param trustStoreType the type of the trust-store
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms to use
*/
Expand All @@ -45,8 +48,11 @@ private[spark] case class SSLOptions(
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
keyPassword: Option[String] = None,
keyStoreType: Option[String] = None,
needClientAuth: Boolean = false,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
trustStoreType: Option[String] = None,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty) {

Expand All @@ -58,12 +64,18 @@ private[spark] case class SSLOptions(
val sslContextFactory = new SslContextFactory()

keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
keyStoreType.foreach(sslContextFactory.setKeyStoreType)
if (needClientAuth) {
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
trustStoreType.foreach(sslContextFactory.setTrustStoreType)
}
protocol.foreach(sslContextFactory.setProtocol)
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
if (enabledAlgorithms.nonEmpty) {
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
}

Some(sslContextFactory)
} else {
Expand Down Expand Up @@ -119,9 +131,12 @@ private[spark] object SSLOptions extends Logging {
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].keyPassword` - a password to the private key
* $ - `[ns].keyStoreType` - the type of the key-store
* $ - `[ns].needClientAuth` - whether SSL needs client authentication
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
* $ - `[ns].trustStoreType` - the type of trust-store
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
Expand Down Expand Up @@ -149,12 +164,21 @@ private[spark] object SSLOptions extends Logging {
val keyPassword = conf.getOption(s"$ns.keyPassword")
.orElse(defaults.flatMap(_.keyPassword))

val keyStoreType = conf.getOption(s"$ns.keyStoreType")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you adding these new properties but I don't see them used? Shouldn't createJettySslContextFactory and potentially createAkkaConfig be modified?

.orElse(defaults.flatMap(_.keyStoreType))

val needClientAuth =
conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))

val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
.orElse(defaults.flatMap(_.trustStore))

val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
.orElse(defaults.flatMap(_.trustStorePassword))

val trustStoreType = conf.getOption(s"$ns.trustStoreType")
.orElse(defaults.flatMap(_.trustStoreType))

val protocol = conf.getOption(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))

Expand All @@ -168,8 +192,11 @@ private[spark] object SSLOptions extends Logging {
keyStore,
keyStorePassword,
keyPassword,
keyStoreType,
needClientAuth,
trustStore,
trustStorePassword,
trustStoreType,
protocol,
enabledAlgorithms)
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,11 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// configuration at a specified namespace. The namespace *must* start with spark.ssl.
val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
val webUISSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.ui", Some(defaultSSLOptions))

logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
logDebug(s"SSLConfiguration for web UI: $webUISSLOptions")

val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
val trustStoreManagers =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ private[deploy] object DeployMessages {
port: Int,
cores: Int,
memory: Int,
webUiPort: Int,
publicAddress: String)
workerWebUiUrl: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private[master] class Master(
System.exit(0)
}

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
Expand All @@ -237,7 +237,7 @@ private[master] class Master(
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerUiPort, publicAddress)
sender, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val actor: ActorRef,
val webUiPort: Int,
val publicAddress: String)
val webUiAddress: String)
extends Serializable {

Utils.checkHost(host, "Expected hostname")
Expand Down Expand Up @@ -100,10 +99,6 @@ private[spark] class WorkerInfo(
coresUsed -= driver.desc.cores
}

def webUiAddress : String = {
"http://" + this.publicAddress + ":" + this.webUiPort
}

def setState(state: WorkerState.Value): Unit = {
this.state = state
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ private[worker] class Worker(
private val CLEANUP_INTERVAL_MILLIS =
conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
private val APP_DATA_RETENTION_SECS =
private val APP_DATA_RETENTION_SECS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

private val testing: Boolean = sys.props.contains("spark.testing")
private var master: ActorSelection = null
private var masterAddress: Address = null
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
private var workerWebUiUrl: String = ""
private val akkaUrl = AkkaUtils.address(
AkkaUtils.protocol(context.system),
actorSystemName,
Expand Down Expand Up @@ -172,6 +173,7 @@ private[worker] class Worker(
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = "http://" + publicAddress + ":" + webUi.boundPort
registerWithMaster()

metricsSystem.registerSource(workerSource)
Expand All @@ -197,7 +199,7 @@ private[worker] class Worker(
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl)
}
}

Expand Down Expand Up @@ -236,7 +238,7 @@ private[worker] class Worker(
*/
if (master != null) {
master ! RegisterWorker(
workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
workerId, host, port, cores, memory, workerWebUiUrl)
} else {
// We are retrying the initial registration
tryRegisterAllMasters()
Expand Down
73 changes: 68 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,25 @@

package org.apache.spark.ui

import java.net.{InetSocketAddress, URL}
import java.net.{URI, URL}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import scala.collection.mutable.{ArrayBuffer, StringBuilder}
import scala.language.implicitConversions
import scala.xml.Node

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.{Connector, Request, Server}
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.eclipse.jetty.server.nio.SelectChannelConnector
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector

import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SSLOptions, SecurityManager, SparkConf}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -201,17 +205,41 @@ private[spark] object JettyUtils extends Logging {
def startJettyServer(
hostName: String,
port: Int,
securityManager: SecurityManager,
handlers: Seq[ServletContextHandler],
conf: SparkConf,
serverName: String = ""): ServerInfo = {

val collection = new ContextHandlerCollection
collection.setHandlers(handlers.toArray)
addFilters(handlers, conf)

// Bind to the given port, or throw a java.net.BindException if the port is occupied
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(hostName, currentPort))
val server = new Server
val connectors = new ArrayBuffer[Connector]
// Create a connector on port currentPort to listen for HTTP requests
val httpConnector = new SelectChannelConnector()
httpConnector.setPort(currentPort)
connectors += httpConnector

val sslContextFactory = securityManager.webUISSLOptions.createJettySslContextFactory()
sslContextFactory.foreach { factory =>
// If the new port wraps around, do not try a privilege port
val securePort = (currentPort + 400 - 1024) % (65536 - 1024) + 1024
val scheme = "https"
// Create a connector on port securePort to listen for HTTPS requests
val connector = new SslSelectChannelConnector(factory)
connector.setPort(securePort)
connectors += connector

// redirect the HTTP requests to HTTPS port
collection.addHandler(createRedirectHttpsHandler(securePort, scheme))
}

handlers.foreach(collection.addHandler)
connectors.foreach(_.setHost(hostName))
server.setConnectors(connectors.toArray)

val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
Expand All @@ -231,6 +259,41 @@ private[spark] object JettyUtils extends Logging {
ServerInfo(server, boundPort, collection)
}

private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = {
val redirectHandler: ContextHandler = new ContextHandler
redirectHandler.setContextPath("/")
redirectHandler.setHandler(new AbstractHandler {
override def handle(
target: String,
baseRequest: Request,
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
if (baseRequest.isSecure) {
return
}
val httpsURI = createRedirectURI(scheme, baseRequest.getServerName, securePort,
baseRequest.getRequestURI, baseRequest.getQueryString)
response.setContentLength(0)
response.encodeRedirectURL(httpsURI)
response.sendRedirect(httpsURI)
baseRequest.setHandled(true)
}
})
redirectHandler
}

// Create a new URI from the arguments, handling IPv6 host encoding and default ports.
private def createRedirectURI(
scheme: String, server: String, port: Int, path: String, query: String) = {
val redirectServer = if (server.contains(":") && !server.startsWith("[")) {
s"[${server}]"
} else {
server
}
val authority = s"$redirectServer:$port"
new URI(scheme, authority, path, query, null).toString
}

/** Attach a prefix to the given path, but avoid returning an empty path */
private def attachPrefix(basePath: String, relativePath: String): String = {
if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private[spark] abstract class WebUI(
def bind() {
assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
serverInfo = Some(startJettyServer("0.0.0.0", port, securityManager, handlers, conf, name))
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
} catch {
case e: Exception =>
Expand Down
Binary file added core/src/test/resources/spark.keystore
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class JsonProtocolSuite extends FunSuite {
createDriverDesc(), new Date())

def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80")
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}
Expand Down
Loading