Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2750] support https in spark web ui #1980

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[deploy] object DeployMessages {
port: Int,
cores: Int,
memory: Int,
webUiPort: Int,
workerWebUiUrl: String,
publicAddress: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ private[spark] class Master(
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
val masterWebUiUrlPrefix = conf.get("spark.http.policy") + "://"
masterWebUiUrl = masterWebUiUrlPrefix + masterPublicAddress + ":" + webUi.boundPort
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)

masterMetricsSystem.registerSource(masterSource)
Expand Down Expand Up @@ -190,7 +191,7 @@ private[spark] class Master(
System.exit(0)
}

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl, publicAddress) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
Expand All @@ -200,7 +201,7 @@ private[spark] class Master(
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerUiPort, publicAddress)
sender, workerWebUiUrl, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val actor: ActorRef,
val webUiPort: Int,
val webUiAddress: String,
val publicAddress: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this publicAddress field is never read anymore, so we should remove it.

extends Serializable {

Expand Down Expand Up @@ -99,10 +99,6 @@ private[spark] class WorkerInfo(
coresUsed -= driver.desc.cores
}

def webUiAddress : String = {
"http://" + this.publicAddress + ":" + this.webUiPort
}

def setState(state: WorkerState.Value) = {
this.state = state
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ private[spark] class Worker(
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName)
var workerWebUiUrl: String = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For stylistic consistency with the surrounding code, I'd prefer if this was initialized with an empty string.

@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
Expand Down Expand Up @@ -138,6 +139,7 @@ private[spark] class Worker(
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = conf.get("spark.http.policy") + "://" + publicAddress + ":" + webUi.boundPort
registerWithMaster()

metricsSystem.registerSource(workerSource)
Expand All @@ -163,7 +165,7 @@ private[spark] class Worker(
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl, publicAddress)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_WORKER_DIR") != null) {
workDir = System.getenv("SPARK_WORKER_DIR")
}
if (conf.contains("worker.ui.port")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be spark.worker.ui.port.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, spark.worker.ui.port is already read on line 51, so this is unnecessary.

webUiPort = conf.get("worker.ui.port").toInt
}

parse(args.toList)

Expand Down
49 changes: 47 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.language.implicitConversions
import scala.util.{Failure, Success, Try}
import scala.xml.Node

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.{Connector, Server}
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
import org.eclipse.jetty.util.thread.QueuedThreadPool
Expand All @@ -35,6 +35,8 @@ import org.json4s.jackson.JsonMethods.{pretty, render}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.Utils
import org.eclipse.jetty.server.nio.SelectChannelConnector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These imports should be placed alongside the other jetty imports (e.g. line 32).

import org.eclipse.jetty.server.ssl.SslSelectChannelConnector

/**
* Utilities for launching a web server using Jetty's HTTP Server class
Expand Down Expand Up @@ -183,7 +185,8 @@ private[spark] object JettyUtils extends Logging {

// Bind to the given port, or throw a java.net.BindException if the port is occupied
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we used to bind Jetty to a specific hostname / interface, but it doesn't look like the new code uses hostName anywhere. To preserve the same behavior, maybe we should call connector.setHost() after connector.setPort() in the code below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here should only setHost(), port has been set in getConnector(currentPort, conf)

def connect(currentPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(hostName, currentPort))
val server = new Server
server.addConnector(getConnector(currentPort, conf))
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
Expand All @@ -207,6 +210,48 @@ private[spark] object JettyUtils extends Logging {
private def attachPrefix(basePath: String, relativePath: String): String = {
if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/")
}

private def getConnector(port: Int, conf: SparkConf): Connector = {
val https = getHttpPolicy(conf)
if (https) {
buildSslSelectChannelConnector(port, conf)
} else {
conf.set("spark.http.policy", "http")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutating the configuration object seems like an anti-pattern to me; can we avoid this?

val connector = new SelectChannelConnector
connector.setPort(port)
connector
}
}

private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector =
{
val connector = new SslSelectChannelConnector
connector.setPort(port)

val context = connector.getSslContextFactory
val needAuth = conf.getBoolean("spark.client.https.need-auth", false)

context.setNeedClientAuth(needAuth)
context.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword", "123456"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be configurable instead of hardcoded?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we should include easy-to-follow instructions to help users properly configure this.

http://wiki.eclipse.org/Jetty/Howto/Configure_SSL ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we don't specify a default for spark.ssl.server.keystore.location or for spark.ssl.server.truststore.password, so we probably shouldn't set one for spark.ssl.server.keystore.keypassword either. Maybe we should log a warning if a user attempts to enable HTTPS without having properly configured their keystore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I'd recommend here instead is to just bypass options to Jetty. e.g., some untested code based on similar Java code I have around:

val ctxFactory = new SslContextFactory();
conf.getAll.
  .filter { case (k, v) => k.startsWith("spark.ui.ssl.") }
  .foreach { case (k, v) => // set property in ctxFactory }
connector = new SslSelectChannelConnector(ctxFactory);

The trick is setting the property; in my code I use BeanUtils.setProperty() for that. My main concern here is that Jetty's SSL connector has a bunch of configurable options, and having a bypass mechanism would avoid having to plumb all of them manually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, pretty good advice

context.setKeyStorePath(conf.get("spark.ssl.server.keystore.location"))
context.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password", "123456"))
context.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks"))

if (needAuth && conf.contains("spark.ssl.server.truststore.location")) {
context.setTrustStore(conf.get("spark.ssl.server.truststore.location"))
context.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password"))
context.setTrustStoreType(conf.get("spark.ssl.server.truststore.type", "jks"))
}
connector
}

def getHttpPolicy(conf: SparkConf): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a confusing name, since this function returns a boolean but sounds like it will return a "http policy". isHttpsEnabled would be a better name, although I'm not sure that we need this:

We should just be able to do

val https = conf.get("spark.http.policy", "") == "https"

instead.

if (conf.contains("spark.http.policy") && conf.get("spark.http.policy").equals("https")) {
true
} else {
false
}
}
}

private[spark] case class ServerInfo(
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ private[spark] class SparkUI(
*/
private[spark] def appUIHostPort = publicHostName + ":" + boundPort

private[spark] def appUIAddress = s"http://$appUIHostPort"
private def appUiAddressPrefix = conf.get("spark.http.policy")

private[spark] def appUIAddress = s"$appUiAddressPrefix://$appUIHostPort"
}

private[spark] object SparkUI {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ private[spark] abstract class WebUI(
assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
if (conf.get("spark.http.policy").equals("https")) {
logInfo("Started %s at https://%s:%d".format(className, publicHostName, boundPort))
} else {
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
}
} catch {
case e: Exception =>
logError("Failed to bind %s".format(className), e)
Expand Down
Binary file added core/src/test/resources/spark.keystore
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class JsonProtocolSuite extends FunSuite {
createDriverDesc(), new Date())

def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80", "publicAddress")
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}
Expand Down
40 changes: 40 additions & 0 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,30 @@ class UISuite extends FunSuite {
assert(boundPort1 != boundPort2)
}

test("jetty with https selects different port under contention") {
val startPort = 4040
val server = new Server(startPort)

Try { server.start() } match {
case Success(s) =>
case Failure(e) =>
// Either case server port is busy hence setup for test complete
}
val sparkConf = new SparkConf()
.set("spark.http.policy", "https")
.set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore")
val serverInfo1 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf)
val serverInfo2 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf)
// Allow some wiggle room in case ports on the machine are under contention
val boundPort1 = serverInfo1.boundPort
val boundPort2 = serverInfo2.boundPort
assert(boundPort1 != startPort)
assert(boundPort2 != startPort)
assert(boundPort1 != boundPort2)
}

test("jetty binds to port 0 correctly") {
val serverInfo = JettyUtils.startJettyServer(
"0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf)
Expand All @@ -128,6 +152,22 @@ class UISuite extends FunSuite {
}
}

test("jetty with https binds to port 0 correctly") {
val sparkConf = new SparkConf()
.set("spark.http.policy", "https")
.set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore")
val serverInfo = JettyUtils.startJettyServer(
"0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf)
val server = serverInfo.server
val boundPort = serverInfo.boundPort
assert(server.getState === "STARTED")
assert(boundPort != 0)
Try { new ServerSocket(boundPort) } match {
case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
case Failure(e) =>
}
}

test("verify appUIAddress contains the scheme") {
withSpark(new SparkContext("local", "test")) { sc =>
val uiAddress = sc.ui.appUIAddress
Expand Down