Skip to content

Commit

Permalink
[SPARK-2750][WEB UI] Add https support to the Web UI
Browse files Browse the repository at this point in the history
Author: scwf <[email protected]>
Author: Marcelo Vanzin <[email protected]>
Author: WangTaoTheTonic <[email protected]>
Author: w00228970 <[email protected]>

Closes #10238 from vanzin/SPARK-2750.
  • Loading branch information
scwf authored and Marcelo Vanzin committed Jan 19, 2016
1 parent efd7eed commit 43f1d59
Show file tree
Hide file tree
Showing 22 changed files with 338 additions and 93 deletions.
50 changes: 45 additions & 5 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param keyPassword a password to access the private key in the key-store
* @param keyStoreType the type of the key-store
* @param needClientAuth set true if SSL needs client authentication
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
* @param trustStoreType the type of the trust-store
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms that may be used
*/
Expand All @@ -49,8 +52,11 @@ private[spark] case class SSLOptions(
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
keyPassword: Option[String] = None,
keyStoreType: Option[String] = None,
needClientAuth: Boolean = false,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
trustStoreType: Option[String] = None,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {
Expand All @@ -63,12 +69,18 @@ private[spark] case class SSLOptions(
val sslContextFactory = new SslContextFactory()

keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
keyStoreType.foreach(sslContextFactory.setKeyStoreType)
if (needClientAuth) {
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
trustStoreType.foreach(sslContextFactory.setTrustStoreType)
}
protocol.foreach(sslContextFactory.setProtocol)
sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
if (supportedAlgorithms.nonEmpty) {
sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
}

Some(sslContextFactory)
} else {
Expand All @@ -82,6 +94,13 @@ private[spark] case class SSLOptions(
*/
def createAkkaConfig: Option[Config] = {
if (enabled) {
if (keyStoreType.isDefined) {
logWarning("Akka configuration does not support key store type.");
}
if (trustStoreType.isDefined) {
logWarning("Akka configuration does not support trust store type.");
}

Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
Expand Down Expand Up @@ -110,7 +129,9 @@ private[spark] case class SSLOptions(
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
* are supported by the current Java security provider for this protocol.
*/
private val supportedAlgorithms: Set[String] = {
private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) {
Set()
} else {
var context: SSLContext = null
try {
context = SSLContext.getInstance(protocol.orNull)
Expand All @@ -133,7 +154,11 @@ private[spark] case class SSLOptions(
logDebug(s"Discarding unsupported cipher $cipher")
}

enabledAlgorithms & providerAlgorithms
val supported = enabledAlgorithms & providerAlgorithms
require(supported.nonEmpty || sys.env.contains("SPARK_TESTING"),
"SSLContext does not support any of the enabled algorithms: " +
enabledAlgorithms.mkString(","))
supported
}

/** Returns a string representation of this SSLOptions with all the passwords masked. */
Expand All @@ -153,9 +178,12 @@ private[spark] object SSLOptions extends Logging {
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].keyPassword` - a password to the private key
* $ - `[ns].keyStoreType` - the type of the key-store
* $ - `[ns].needClientAuth` - whether SSL needs client authentication
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
* $ - `[ns].trustStoreType` - the type of trust-store
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
Expand Down Expand Up @@ -183,12 +211,21 @@ private[spark] object SSLOptions extends Logging {
val keyPassword = conf.getOption(s"$ns.keyPassword")
.orElse(defaults.flatMap(_.keyPassword))

val keyStoreType = conf.getOption(s"$ns.keyStoreType")
.orElse(defaults.flatMap(_.keyStoreType))

val needClientAuth =
conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))

val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
.orElse(defaults.flatMap(_.trustStore))

val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
.orElse(defaults.flatMap(_.trustStorePassword))

val trustStoreType = conf.getOption(s"$ns.trustStoreType")
.orElse(defaults.flatMap(_.trustStoreType))

val protocol = conf.getOption(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))

Expand All @@ -202,8 +239,11 @@ private[spark] object SSLOptions extends Logging {
keyStore,
keyStorePassword,
keyPassword,
keyStoreType,
needClientAuth,
trustStore,
trustStorePassword,
trustStoreType,
protocol,
enabledAlgorithms)
}
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)

// SSL configuration for different communication layers - they can override the default
// configuration at a specified namespace. The namespace *must* start with spark.ssl.
val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))

logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")

// SSL configuration for the file server. This is used by Utils.setupSecureURLConnection().
val fileServerSSLOptions = getSSLOptions("fs")
val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
val trustStoreManagers =
for (trustStore <- fileServerSSLOptions.trustStore) yield {
Expand Down Expand Up @@ -292,6 +286,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
(None, None)
}

def getSSLOptions(module: String): SSLOptions = {
val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions))
logDebug(s"Created SSL options for $module: $opts")
opts
}

/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ private[deploy] object DeployMessages {
worker: RpcEndpointRef,
cores: Int,
memory: Int,
webUiPort: Int,
publicAddress: String)
workerWebUiUrl: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class HistoryServer(
provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
port: Int)
extends WebUI(securityManager, port, conf) with Logging with UIRoot {
extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf)
with Logging with UIRoot {

// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
Expand Down Expand Up @@ -233,7 +234,7 @@ object HistoryServer extends Logging {

val UI_PATH_PREFIX = "/history"

def main(argStrings: Array[String]) {
def main(argStrings: Array[String]): Unit = {
Utils.initDaemon(log)
new HistoryServerArguments(conf, argStrings)
initSecurity()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private[deploy] class Master(

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
Expand All @@ -392,7 +392,7 @@ private[deploy] class Master(
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val endpoint: RpcEndpointRef,
val webUiPort: Int,
val publicAddress: String)
val webUiAddress: String)
extends Serializable {

Utils.checkHost(host, "Expected hostname")
Expand Down Expand Up @@ -98,10 +97,6 @@ private[spark] class WorkerInfo(
coresUsed -= driver.desc.cores
}

def webUiAddress : String = {
"http://" + this.publicAddress + ":" + this.webUiPort
}

def setState(state: WorkerState.Value): Unit = {
this.state = state
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class MasterWebUI(
val master: Master,
requestedPort: Int,
customMasterPage: Option[MasterPage] = None)
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging
with UIRoot {
extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"),
requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot {

val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[spark] class MesosClusterUI(
conf: SparkConf,
dispatcherPublicAddress: String,
val scheduler: MesosClusterScheduler)
extends WebUI(securityManager, port, conf) {
extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) {

initialize()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ private[deploy] class Worker(
private var master: Option[RpcEndpointRef] = None
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
private var workerWebUiUrl: String = ""
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
private var connected = false
Expand Down Expand Up @@ -184,6 +185,9 @@ private[deploy] class Worker(
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()

val scheme = if (webUi.sslOptions.enabled) "https" else "http"
workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}"
registerWithMaster()

metricsSystem.registerSource(workerSource)
Expand Down Expand Up @@ -336,7 +340,7 @@ private[deploy] class Worker(

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class WorkerWebUI(
val worker: Worker,
val workDir: File,
requestedPort: Int)
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
extends WebUI(worker.securityMgr, worker.securityMgr.getSSLOptions("standalone"),
requestedPort, worker.conf, name = "WorkerUI")
with Logging {

private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)
Expand Down
Loading

0 comments on commit 43f1d59

Please sign in to comment.