-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2750] support https in spark web ui #1980
Changes from 2 commits
e8256e5
8f7cc96
c90d84e
de8d1bd
35074fd
967950f
9591c9c
333334a
64d7dc0
89bf986
8e5132d
677b746
a4ce923
6c31dc7
7a898fb
a29ec86
55bbc5f
e5c87cb
baaa1ce
a48c6fc
2dadb2f
8b32853
3b01d3a
4ae834b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,6 +78,7 @@ private[spark] class Worker( | |
var activeMasterUrl: String = "" | ||
var activeMasterWebUiUrl : String = "" | ||
val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) | ||
var workerWebUiUrl: String = _ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For stylistic consistency with the surrounding code, I'd prefer if this was initialized with an empty string. |
||
@volatile var registered = false | ||
@volatile var connected = false | ||
val workerId = generateWorkerId() | ||
|
@@ -138,6 +139,7 @@ private[spark] class Worker( | |
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) | ||
webUi = new WorkerWebUI(this, workDir, webUiPort) | ||
webUi.bind() | ||
workerWebUiUrl = conf.get("spark.http.policy") + "://" + publicAddress + ":" + webUi.boundPort | ||
registerWithMaster() | ||
|
||
metricsSystem.registerSource(workerSource) | ||
|
@@ -163,7 +165,7 @@ private[spark] class Worker( | |
for (masterUrl <- masterUrls) { | ||
logInfo("Connecting to master " + masterUrl + "...") | ||
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) | ||
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) | ||
actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl, publicAddress) | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { | |
if (System.getenv("SPARK_WORKER_DIR") != null) { | ||
workDir = System.getenv("SPARK_WORKER_DIR") | ||
} | ||
if (conf.contains("worker.ui.port")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, |
||
webUiPort = conf.get("worker.ui.port").toInt | ||
} | ||
|
||
parse(args.toList) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ import scala.language.implicitConversions | |
import scala.util.{Failure, Success, Try} | ||
import scala.xml.Node | ||
|
||
import org.eclipse.jetty.server.Server | ||
import org.eclipse.jetty.server.{Connector, Server} | ||
import org.eclipse.jetty.server.handler._ | ||
import org.eclipse.jetty.servlet._ | ||
import org.eclipse.jetty.util.thread.QueuedThreadPool | ||
|
@@ -35,6 +35,8 @@ import org.json4s.jackson.JsonMethods.{pretty, render} | |
|
||
import org.apache.spark.{Logging, SecurityManager, SparkConf} | ||
import org.apache.spark.util.Utils | ||
import org.eclipse.jetty.server.nio.SelectChannelConnector | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These imports should be placed alongside the other |
||
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector | ||
|
||
/** | ||
* Utilities for launching a web server using Jetty's HTTP Server class | ||
|
@@ -183,7 +185,8 @@ private[spark] object JettyUtils extends Logging { | |
|
||
// Bind to the given port, or throw a java.net.BindException if the port is occupied | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like we used to bind Jetty to a specific hostname / interface, but it doesn't look like the new code uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here should only |
||
def connect(currentPort: Int): (Server, Int) = { | ||
val server = new Server(new InetSocketAddress(hostName, currentPort)) | ||
val server = new Server | ||
server.addConnector(getConnector(currentPort, conf)) | ||
val pool = new QueuedThreadPool | ||
pool.setDaemon(true) | ||
server.setThreadPool(pool) | ||
|
@@ -207,6 +210,48 @@ private[spark] object JettyUtils extends Logging { | |
private def attachPrefix(basePath: String, relativePath: String): String = { | ||
if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") | ||
} | ||
|
||
private def getConnector(port: Int, conf: SparkConf): Connector = { | ||
val https = getHttpPolicy(conf) | ||
if (https) { | ||
buildSslSelectChannelConnector(port, conf) | ||
} else { | ||
conf.set("spark.http.policy", "http") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mutating the configuration object seems like an anti-pattern to me; can we avoid this? |
||
val connector = new SelectChannelConnector | ||
connector.setPort(port) | ||
connector | ||
} | ||
} | ||
|
||
private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = | ||
{ | ||
val connector = new SslSelectChannelConnector | ||
connector.setPort(port) | ||
|
||
val context = connector.getSslContextFactory | ||
val needAuth = conf.getBoolean("spark.client.https.need-auth", false) | ||
|
||
context.setNeedClientAuth(needAuth) | ||
context.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword", "123456")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be configurable instead of hardcoded? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, we should include easy-to-follow instructions to help users properly configure this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like we don't specify a default for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something I'd recommend here instead is to just bypass options to Jetty. e.g., some untested code based on similar Java code I have around:
The trick is setting the property; in my code I use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks, pretty good advice |
||
context.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) | ||
context.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password", "123456")) | ||
context.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) | ||
|
||
if (needAuth && conf.contains("spark.ssl.server.truststore.location")) { | ||
context.setTrustStore(conf.get("spark.ssl.server.truststore.location")) | ||
context.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) | ||
context.setTrustStoreType(conf.get("spark.ssl.server.truststore.type", "jks")) | ||
} | ||
connector | ||
} | ||
|
||
def getHttpPolicy(conf: SparkConf): Boolean = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a confusing name, since this function returns a boolean but sounds like it will return a "http policy". We should just be able to do
instead. |
||
if (conf.contains("spark.http.policy") && conf.get("spark.http.policy").equals("https")) { | ||
true | ||
} else { | ||
false | ||
} | ||
} | ||
} | ||
|
||
private[spark] case class ServerInfo( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this
publicAddress
field is never read anymore, so we should remove it.