Skip to content

Commit

Permalink
use spark.ssl.ui.* instead, update docs, fixes in Suite and other min…
Browse files Browse the repository at this point in the history
…or improvements
  • Loading branch information
WangTaoTheTonic committed May 4, 2015
1 parent 7def14e commit 18982b4
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 165 deletions.
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,12 @@ private[spark] object SSLOptions extends Logging {
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].keyPassword` - a password to the private key
* $ - `[ns].keyStoreType` - the type of the key-store
* $ - `[ns].needAuth` - whether SSL needs client authentication
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
* $ - `[ns].trustStoreType` - the type of trust-store
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,11 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// configuration at a specified namespace. The namespace *must* start with spark.ssl.
val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
val webUISSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.ui", Some(defaultSSLOptions))

logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
logDebug(s"SSLConfiguration for Akka: $webUISSLOptions")

val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
val trustStoreManagers =
Expand Down
37 changes: 16 additions & 21 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ui

import java.net.URL
import java.net.{URI, URL}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

Expand Down Expand Up @@ -205,6 +205,7 @@ private[spark] object JettyUtils extends Logging {
def startJettyServer(
hostName: String,
port: Int,
securityManager: SecurityManager,
handlers: Seq[ServletContextHandler],
conf: SparkConf,
serverName: String = ""): ServerInfo = {
Expand All @@ -221,8 +222,7 @@ private[spark] object JettyUtils extends Logging {
httpConnector.setPort(currentPort)
connectors += httpConnector

val sslContextFactory =
SSLOptions.parse(conf, "spark.ui.https").createJettySslContextFactory()
val sslContextFactory = securityManager.webUISSLOptions.createJettySslContextFactory()
sslContextFactory.foreach { factory =>
// If the new port wraps around, do not try a privilege port
val securePort = (currentPort + 400 - 1024) % (65536 - 1024) + 1024
Expand Down Expand Up @@ -259,23 +259,6 @@ private[spark] object JettyUtils extends Logging {
ServerInfo(server, boundPort, collection)
}

// Create a new URI from the arguments, handling IPv6 host encoding and default ports. Based on:
// https://github.com/eclipse/jetty.project/blob/master/jetty-util/src/main/java/org/eclipse/
// jetty/util/URIUtil.java#L726-L733
private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = {
val builder = new StringBuilder

if (server.indexOf(':') >= 0 && server.charAt(0) != '[') {
builder.append(scheme).append("://").append('[').append(server).append(']')
} else {
builder.append(scheme).append("://").append(server)
}
builder.append(':').append(port)
builder.append(path)
if (query != null && query.length > 0) builder.append('?').append(query)
builder.toString
}

private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = {
val redirectHandler: ContextHandler = new ContextHandler
redirectHandler.setContextPath("/")
Expand All @@ -288,7 +271,7 @@ private[spark] object JettyUtils extends Logging {
if (baseRequest.isSecure) {
return
}
val httpsURI = newURI(scheme, baseRequest.getServerName, securePort,
val httpsURI = createRedirectURI(scheme, baseRequest.getServerName, securePort,
baseRequest.getRequestURI, baseRequest.getQueryString)
response.setContentLength(0)
response.encodeRedirectURL(httpsURI)
Expand All @@ -299,6 +282,18 @@ private[spark] object JettyUtils extends Logging {
redirectHandler
}

// Create a new URI from the arguments, handling IPv6 host encoding and default ports.
private def createRedirectURI(
scheme: String, server: String, port: Int, path: String, query: String) = {
val redirectServer = if (server.contains(":") && !server.startsWith("[")) {
s"[${server}]"
} else {
server
}
val authority = s"$redirectServer:$port"
new URI(scheme, authority, path, query, null).toString
}

/** Attach a prefix to the given path, but avoid returning an empty path */
private def attachPrefix(basePath: String, relativePath: String): String = {
if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private[spark] abstract class WebUI(
def bind() {
assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
serverInfo = Some(startJettyServer("0.0.0.0", port, securityManager, handlers, conf, name))
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
} catch {
case e: Exception =>
Expand Down
165 changes: 102 additions & 63 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.spark.ui

import java.net.ServerSocket
import java.net.{BindException, ServerSocket}

import scala.io.Source
import scala.util.{Failure, Success, Try}

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.LocalSparkContext._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}

class UISuite extends FunSuite {

Expand Down Expand Up @@ -71,77 +71,106 @@ class UISuite extends FunSuite {
}

test("jetty selects different port under contention") {
val server = new ServerSocket(0)
val startPort = server.getLocalPort
val serverInfo1 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf)
val serverInfo2 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf)
// Allow some wiggle room in case ports on the machine are under contention
val boundPort1 = serverInfo1.boundPort
val boundPort2 = serverInfo2.boundPort
assert(boundPort1 != startPort)
assert(boundPort2 != startPort)
assert(boundPort1 != boundPort2)
serverInfo1.server.stop()
serverInfo2.server.stop()
server.close()
var server: ServerSocket = null
var serverInfo1: ServerInfo = null
var serverInfo2: ServerInfo = null
val conf = new SparkConf
val securityManager = new SecurityManager(conf)
try {
server = new ServerSocket(0)
val startPort = server.getLocalPort
serverInfo1 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, securityManager, Seq[ServletContextHandler](), conf)
serverInfo2 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, securityManager, Seq[ServletContextHandler](), conf)
// Allow some wiggle room in case ports on the machine are under contention
val boundPort1 = serverInfo1.boundPort
val boundPort2 = serverInfo2.boundPort
assert(boundPort1 != startPort)
assert(boundPort2 != startPort)
assert(boundPort1 != boundPort2)
} finally {
UISuite.stopServer(serverInfo1.server)
UISuite.stopServer(serverInfo2.server)
UISuite.closeSocket(server)
}
}

test("jetty with https selects different port under contention") {
val server = new ServerSocket(0)
val startPort = server.getLocalPort

val sparkConf = new SparkConf()
.set("spark.ui.https.enabled", "true")
.set("spark.ui.https.keyStore", "./src/test/resources/spark.keystore")
.set("spark.ui.https.keyStorePassword", "123456")
.set("spark.ui.https.keyPassword", "123456")
val serverInfo1 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server1")
val serverInfo2 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server2")
// Allow some wiggle room in case ports on the machine are under contention
val boundPort1 = serverInfo1.boundPort
val boundPort2 = serverInfo2.boundPort
assert(boundPort1 != startPort)
assert(boundPort2 != startPort)
assert(boundPort1 != boundPort2)
serverInfo1.server.stop()
serverInfo2.server.stop()
server.close()
var server: ServerSocket = null
var serverInfo1: ServerInfo = null
var serverInfo2: ServerInfo = null
try {
server = new ServerSocket(0)
val startPort = server.getLocalPort

val sparkConf = new SparkConf()
.set("spark.ssl.ui.enabled", "true")
.set("spark.ssl.ui.keyStore", "./src/test/resources/spark.keystore")
.set("spark.ssl.ui.keyStorePassword", "123456")
.set("spark.ssl.ui.keyPassword", "123456")
val securityManager = new SecurityManager(sparkConf)
serverInfo1 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, securityManager, Seq[ServletContextHandler](), sparkConf, "server1")
serverInfo2 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, securityManager, Seq[ServletContextHandler](), sparkConf, "server2")
// Allow some wiggle room in case ports on the machine are under contention
val boundPort1 = serverInfo1.boundPort
val boundPort2 = serverInfo2.boundPort
assert(boundPort1 != startPort)
assert(boundPort2 != startPort)
assert(boundPort1 != boundPort2)
} finally {
UISuite.stopServer(serverInfo1.server)
UISuite.stopServer(serverInfo2.server)
UISuite.closeSocket(server)
}
}

test("jetty binds to port 0 correctly") {
val serverInfo = JettyUtils.startJettyServer(
"0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf)
val server = serverInfo.server
val boundPort = serverInfo.boundPort
assert(server.getState === "STARTED")
assert(boundPort != 0)
Try { new ServerSocket(boundPort) } match {
case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
case Failure(e) =>
var socket: ServerSocket = null
var serverInfo: ServerInfo = null
val conf = new SparkConf
val securityManager = new SecurityManager(conf)
try {
serverInfo = JettyUtils.startJettyServer(
"0.0.0.0", 0, securityManager, Seq[ServletContextHandler](), conf)
val server = serverInfo.server
val boundPort = serverInfo.boundPort
assert(server.getState === "STARTED")
assert(boundPort != 0)
intercept[BindException] {
socket = new ServerSocket(boundPort)
}
} finally {
UISuite.stopServer(serverInfo.server)
UISuite.closeSocket(socket)
}
}

test("jetty with https binds to port 0 correctly") {
val sparkConf = new SparkConf()
.set("spark.ui.https.enabled", "false")
.set("spark.ui.https.keyStore", "./src/test/resources/spark.keystore")
.set("spark.ui.https.keyStorePassword", "123456")
.set("spark.ui.https.keyPassword", "123456")
val serverInfo = JettyUtils.startJettyServer(
"0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf)
val server = serverInfo.server
val boundPort = serverInfo.boundPort
assert(server.getState === "STARTED")
assert(boundPort != 0)
Try { new ServerSocket(boundPort) } match {
case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
case Failure(e) =>
var socket: ServerSocket = null
var serverInfo: ServerInfo = null
try {
val sparkConf = new SparkConf()
.set("spark.ssl.ui.enabled", "false")
.set("spark.ssl.ui.keyStore", "./src/test/resources/spark.keystore")
.set("spark.ssl.ui.keyStorePassword", "123456")
.set("spark.ssl.ui.keyPassword", "123456")
val securityManager = new SecurityManager(sparkConf)
serverInfo = JettyUtils.startJettyServer(
"0.0.0.0", 0, securityManager, Seq[ServletContextHandler](), sparkConf)
val server = serverInfo.server
val boundPort = serverInfo.boundPort
assert(server.getState === "STARTED")
assert(boundPort != 0)
intercept[BindException] {
socket = new ServerSocket(boundPort)
}
} finally {
UISuite.stopServer(serverInfo.server)
UISuite.closeSocket(socket)
}
serverInfo.server.stop()
}

test("verify appUIAddress contains the scheme") {
Expand All @@ -162,3 +191,13 @@ class UISuite extends FunSuite {
}
}
}

object UISuite {
def stopServer(server: Server): Unit = {
if (server != null) server.stop
}

def closeSocket(socket: ServerSocket): Unit = {
if (socket != null) socket.close
}
}
21 changes: 21 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,13 @@ Apart from these, the following properties are also available, and may be useful
A password to the key-store.
</td>
</tr>
<tr>
<td><code>spark.ssl.keyStoreType</code></td>
<td>JKS</td>
<td>
The type of the key-store.
</td>
</tr>
<tr>
<td><code>spark.ssl.protocol</code></td>
<td>None</td>
Expand All @@ -1374,6 +1381,13 @@ Apart from these, the following properties are also available, and may be useful
page.
</td>
</tr>
<tr>
<td><code>spark.ssl.needAuth</code></td>
<td>false</td>
<td>
Set true if SSL needs client authentication.
</td>
</tr>
<tr>
<td><code>spark.ssl.trustStore</code></td>
<td>None</td>
Expand All @@ -1389,6 +1403,13 @@ Apart from these, the following properties are also available, and may be useful
A password to the trust-store.
</td>
</tr>
<tr>
<td><code>spark.ssl.trustStoreType</code></td>
<td>JKS</td>
<td>
The type of the trust-store.
</td>
</tr>
</table>


Expand Down
Loading

0 comments on commit 18982b4

Please sign in to comment.