-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2750] support https in spark web ui #1980
Changes from 22 commits
e8256e5
8f7cc96
c90d84e
de8d1bd
35074fd
967950f
9591c9c
333334a
64d7dc0
89bf986
8e5132d
677b746
a4ce923
6c31dc7
7a898fb
a29ec86
55bbc5f
e5c87cb
baaa1ce
a48c6fc
2dadb2f
8b32853
3b01d3a
4ae834b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,17 +17,23 @@ | |
|
||
package org.apache.spark.ui | ||
|
||
import java.net.{InetSocketAddress, URL} | ||
import java.net.URL | ||
import javax.servlet.DispatcherType | ||
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} | ||
|
||
import scala.collection.mutable.StringBuilder | ||
import scala.language.implicitConversions | ||
import scala.xml.Node | ||
|
||
import org.eclipse.jetty.server.Server | ||
import org.eclipse.jetty.server.{Request, Connector, Server} | ||
import org.eclipse.jetty.server.handler._ | ||
import org.eclipse.jetty.servlet._ | ||
import org.eclipse.jetty.util.thread.QueuedThreadPool | ||
import org.eclipse.jetty.server.nio.SelectChannelConnector | ||
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector | ||
import org.eclipse.jetty.http.HttpStatus | ||
import org.eclipse.jetty.util.ssl.SslContextFactory | ||
|
||
import org.json4s.JValue | ||
import org.json4s.jackson.JsonMethods.{pretty, render} | ||
|
||
|
@@ -180,12 +186,32 @@ private[spark] object JettyUtils extends Logging { | |
serverName: String = ""): ServerInfo = { | ||
|
||
val collection = new ContextHandlerCollection | ||
collection.setHandlers(handlers.toArray) | ||
addFilters(handlers, conf) | ||
|
||
// Bind to the given port, or throw a java.net.BindException if the port is occupied | ||
def connect(currentPort: Int): (Server, Int) = { | ||
val server = new Server(new InetSocketAddress(hostName, currentPort)) | ||
val server = new Server | ||
// Create a connector on port currentPort to listen for HTTP requests | ||
val httpConnector = new SelectChannelConnector() | ||
httpConnector.setPort(currentPort) | ||
httpConnector.setHost(hostName) | ||
|
||
if (conf.get("spark.ui.https.enabled", "false").toBoolean) { | ||
// / If the new port wraps around, do not try a privilege port | ||
val securePort = (currentPort + 1 - 1024) % (65536 - 1024) + 1024 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if this port isn't free? It's dangerous to make assumptions about whether a particular ports is free, especially since this can cause problems in Jenkins, where we might have multiple builds running concurrently on the same machine that are contending for ports. Could we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey, note There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see. Thanks for explaining. |
||
val scheme = "https" | ||
// Create a connector on port securePort to listen for HTTPS requests | ||
val connector = buildSslSelectChannelConnector(securePort, conf) | ||
connector.setHost(hostName) | ||
server.setConnectors(Seq(httpConnector,connector).toArray) | ||
|
||
// redirect the HTTP requests to HTTPS port | ||
val newHandlers = Seq(createRedirectHttpsHandler(securePort, scheme)) ++ handlers | ||
collection.setHandlers(newHandlers.toArray) | ||
} else { | ||
server.addConnector(httpConnector) | ||
collection.setHandlers(handlers.toArray) | ||
} | ||
val pool = new QueuedThreadPool | ||
pool.setDaemon(true) | ||
server.setThreadPool(pool) | ||
|
@@ -205,10 +231,74 @@ private[spark] object JettyUtils extends Logging { | |
ServerInfo(server, boundPort, collection) | ||
} | ||
|
||
// to generate a new url string scheme://server:port+path | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @scwf, The reason I asked for a comment is that it seems like the method is doing a little more than just that. For example, L238 seems to be doing some sort of parsing of the It would be nice if the comment explained exactly what the relationship between the input and the output is. A unit test wouldn't hurt either. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @vanzin, actually here i just refer to the code of jetty 9 see(https://github.com/eclipse/jetty.project/blob/master/jetty-util/src/main/java/org/eclipse/jetty/util/URIUtil.java#L726-L733) since there is no And L238 is for the case to handle IPv6 address, here we can remove it if unnecessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you put that reference in the code itself, so we know where it comes from? Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, updated. |
||
private def newURL(scheme: String, server: String, port: Int, path: String, query: String) = { | ||
val builder = new StringBuilder | ||
|
||
if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { | ||
builder.append(scheme).append("://").append('[').append(server).append(']') | ||
} else { | ||
builder.append(scheme).append("://").append(server) | ||
} | ||
builder.append(':').append(port) | ||
builder.append(path) | ||
if (query != null && query.length > 0) builder.append('?').append(query) | ||
builder.toString | ||
} | ||
|
||
private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = { | ||
val redirectHandler: ContextHandler = new ContextHandler | ||
redirectHandler.setContextPath("/") | ||
redirectHandler.setHandler(new AbstractHandler { | ||
override def handle( | ||
target: String, | ||
baseRequest: Request, | ||
request: HttpServletRequest, | ||
response: HttpServletResponse): Unit = { | ||
if (baseRequest.isSecure) { | ||
return | ||
} | ||
val httpsURL = newURL(scheme, baseRequest.getServerName, securePort, | ||
baseRequest.getRequestURI, baseRequest.getQueryString) | ||
response.setContentLength(0) | ||
response.encodeRedirectURL(httpsURL) | ||
response.sendRedirect(httpsURL) | ||
baseRequest.setHandled(true) | ||
} | ||
}) | ||
redirectHandler | ||
} | ||
|
||
/** Attach a prefix to the given path, but avoid returning an empty path */ | ||
private def attachPrefix(basePath: String, relativePath: String): String = { | ||
if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") | ||
} | ||
|
||
private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = { | ||
val ctxFactory = new SslContextFactory() | ||
conf.getAll | ||
.filter { case (k, v) => k.startsWith("spark.ui.ssl.") } | ||
.foreach { case (k, v) => setSslContextFactoryProps(k, v, ctxFactory) } | ||
|
||
val connector = new SslSelectChannelConnector(ctxFactory) | ||
connector.setPort(port) | ||
connector | ||
} | ||
|
||
private def setSslContextFactoryProps( | ||
key: String, value: String, ctxFactory: SslContextFactory) = { | ||
key match { | ||
case "spark.ui.ssl.client.https.needAuth" => ctxFactory.setNeedClientAuth(value.toBoolean) | ||
case "spark.ui.ssl.server.keystore.keypassword" => ctxFactory.setKeyManagerPassword(value) | ||
case "spark.ui.ssl.server.keystore.location" => ctxFactory.setKeyStorePath(value) | ||
case "spark.ui.ssl.server.keystore.password" => ctxFactory.setKeyStorePassword(value) | ||
case "spark.ui.ssl.server.keystore.type" => ctxFactory.setKeyStoreType(value) | ||
case "spark.ui.ssl.server.truststore.location" => ctxFactory.setTrustStore(value) | ||
case "spark.ui.ssl.server.truststore.password" => ctxFactory.setTrustStorePassword(value) | ||
case "spark.ui.ssl.server.truststore.type" => ctxFactory.setTrustStoreType(value) | ||
} | ||
} | ||
|
||
} | ||
|
||
private[spark] case class ServerInfo( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we used to bind Jetty to a specific hostname / interface, but it doesn't look like the new code uses
hostName
anywhere. To preserve the same behavior, maybe we should callconnector.setHost()
afterconnector.setPort()
in the code below.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here should only
setHost()
, port has been set ingetConnector(currentPort, conf)