Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2157] Enable tight firewall rules for Spark #1777

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
1c0981a
Make port in HttpServer configurable
ash211 Jun 17, 2014
49ee29b
SPARK-1174 Add port configuration for HttpFileServer
ash211 Jun 17, 2014
f34115d
SPARK-1176 Add port configuration for HttpBroadcast
ash211 Jun 17, 2014
17c79bb
Add a configuration option for spark-shell's class server
ash211 Jun 17, 2014
b80d2fd
Make Spark's block manager port configurable
ash211 Jun 17, 2014
c5a0568
Fix ConnectionManager to retry with increment
ash211 Jun 17, 2014
cad16da
Add fallover increment logic for HttpServer
ash211 Jun 17, 2014
066dc7a
Fix up HttpServer port increments
ash211 Jun 17, 2014
5d84e0e
Document new port configuration options
ash211 Jun 17, 2014
9e4ad96
Reformat for style checker
ash211 Jun 17, 2014
24a4c32
Remove type on val to match surrounding style
ash211 Jun 30, 2014
0347aef
Unify port fallback logic to a single place
ash211 Jun 30, 2014
7c5bdc4
Fix style issue
ash211 Jun 30, 2014
038a579
Trust the server start function to report the port the service starte…
ash211 Jun 30, 2014
ec676f4
Merge branch 'SPARK-2157' of github.com:ash211/spark into configure-p…
andrewor14 Aug 4, 2014
73fbe89
Move start service logic to Utils
andrewor14 Aug 4, 2014
6b550b0
Assorted fixes
andrewor14 Aug 4, 2014
ba32280
Minor fixes
andrewor14 Aug 5, 2014
1d7e408
Treat 0 ports specially + return correct ConnectionManager port
andrewor14 Aug 5, 2014
470f38c
Special case non-"Address already in use" exceptions
andrewor14 Aug 5, 2014
e111d08
Add names for UI services
andrewor14 Aug 5, 2014
3f8e51b
Correct erroneous docs...
andrewor14 Aug 5, 2014
4d9e6f3
Fix super subtle bug
andrewor14 Aug 5, 2014
8d836e6
Also document SPARK_{MASTER/WORKER}_WEBUI_PORT
andrewor14 Aug 5, 2014
6016e77
Add spark.executor.port
andrewor14 Aug 5, 2014
9868358
Add a few miscellaneous ports
andrewor14 Aug 5, 2014
151327a
Merge branch 'master' of github.com:apache/spark into configure-ports
andrewor14 Aug 5, 2014
2551eb2
Remove spark.worker.watcher.port
andrewor14 Aug 5, 2014
b565079
Add spark.ports.maxRetries
andrewor14 Aug 5, 2014
de1b207
Update docs to reflect new ports
andrewor14 Aug 5, 2014
bfbab28
Merge branch 'master' of github.com:apache/spark into configure-ports
andrewor14 Aug 5, 2014
e837cde
Remove outdated TODOs
andrewor14 Aug 5, 2014
cb3be88
Various doc fixes (broken link, format etc.)
andrewor14 Aug 5, 2014
1d2d5c6
Fix ports for standalone cluster mode
andrewor14 Aug 5, 2014
86461e2
Remove spark.executor.env.port and spark.standalone.client.port
andrewor14 Aug 5, 2014
a2dd05c
Patrick's comment nit
andrewor14 Aug 5, 2014
d502e5f
Handle port collisions when creating Akka systems
andrewor14 Aug 5, 2014
93d359f
Executors connect to wrong port when collision occurs
andrewor14 Aug 5, 2014
c22ad00
Merge branch 'master' of github.com:apache/spark into configure-ports
andrewor14 Aug 5, 2014
b97b02a
Minor fixes
andrewor14 Aug 6, 2014
523c30e
Add test for isBindCollision
andrewor14 Aug 6, 2014
7da0493
Fix tests
andrewor14 Aug 6, 2014
8a6b820
Use a random UI port during tests
andrewor14 Aug 6, 2014
621267b
Merge branch 'master' of github.com:apache/spark into configure-ports
andrewor14 Aug 6, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import com.google.common.io.Files

import org.apache.spark.util.Utils

private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
private[spark] class HttpFileServer(
securityManager: SecurityManager,
requestedPort: Int = 0)
extends Logging {

var baseDir : File = null
var fileDir : File = null
Expand All @@ -38,7 +41,7 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(baseDir, securityManager)
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
Expand Down
88 changes: 54 additions & 34 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.bio.SocketConnector
Expand All @@ -41,48 +41,68 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
extends Logging {
private[spark] class HttpServer(
resourceBase: File,
securityManager: SecurityManager,
requestedPort: Int = 0,
serverName: String = "HTTP server")
extends Logging {

private var server: Server = null
private var port: Int = -1
private var port: Int = requestedPort

def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(0)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
port = server.getConnectors()(0).getLocalPort()
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
server = actualServer
port = actualPort
}
}

/**
* Actually start the HTTP server on the given port.
*
* Note that this is only best effort in the sense that we may end up binding to a nearby port
* in the event of port collision. Return the bound server and the actual port used.
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
val actualPort = server.getConnectors()(0).getLocalPort

(server, actualPort)
}

/**
* Setup Jetty to the HashLoginService using a single user with our
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
Expand Down Expand Up @@ -134,7 +154,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
return "http://" + Utils.localIpAddress + ":" + port
"http://" + Utils.localIpAddress + ":" + port
}
}
}
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ private[spark] object SparkConf {
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
isSparkPortConf(name)
}

/**
* Return whether the given config is a Spark port config.
*/
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
}
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.net.Socket

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.concurrent.Await
import scala.util.Properties

import akka.actor._
Expand Down Expand Up @@ -151,10 +150,10 @@ object SparkEnv extends Logging {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
securityManager = securityManager)

// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
// figure out which port number Akka actually bound to and set spark.driver.port to it.
if (isDriver && port == 0) {
conf.set("spark.driver.port", boundPort.toString)
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// This is so that we tell the executors the correct port to connect to.
if (isDriver) {
conf.set("spark.driver.port", boundPort.toString)
}

// Create an instance of the class named by the given Java system property, or by
Expand Down Expand Up @@ -222,7 +221,8 @@ object SparkEnv extends Logging {

val httpFileServer =
if (isDriver) {
val server = new HttpFileServer(securityManager)
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {

private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
server = new HttpServer(broadcastDir, securityManager)
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ object Client {
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)

// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.AkkaUtils
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {

val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.worker.ui.WorkerWebUI._
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.AkkaUtils
Expand All @@ -34,7 +35,7 @@ class WorkerWebUI(
val worker: Worker,
val workDir: File,
port: Option[Int] = None)
extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf)
extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, name = "WorkerUI")
with Logging {

val timeout = AkkaUtils.askTimeout(worker.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {

// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val port = executorConf.getInt("spark.executor.port", 0)
val (fetcher, _) = AkkaUtils.createActorSystem(
"driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf))
"driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf))
val driver = fetcher.actorSelection(driverUrl)
val timeout = AkkaUtils.askTimeout(executorConf)
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
Expand All @@ -126,7 +127,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Create a new ActorSystem using driver's Spark properties to run the backend.
val driverConf = new SparkConf().setAll(props)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf))
"sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ import scala.language.postfixOps
import org.apache.spark._
import org.apache.spark.util.{SystemClock, Utils}

private[spark] class ConnectionManager(port: Int, conf: SparkConf,
securityManager: SecurityManager) extends Logging {
private[spark] class ConnectionManager(
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
name: String = "Connection manager")
extends Logging {

class MessageStatus(
val message: Message,
Expand Down Expand Up @@ -105,7 +109,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
serverChannel.socket.setReuseAddress(true)
serverChannel.socket.setReceiveBufferSize(256 * 1024)

serverChannel.socket.bind(new InetSocketAddress(port))
private def startService(port: Int): (ServerSocketChannel, Int) = {
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, serverChannel.socket.getLocalPort)
}
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker)
extends Logging {

private val port = conf.getInt("spark.blockManager.port", 0)
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0, conf, securityManager)
val connectionManager =
new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")

implicit val futureExecContext = connectionManager.futureExecContext

Expand Down
26 changes: 9 additions & 17 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,40 +174,32 @@ private[spark] object JettyUtils extends Logging {
hostName: String,
port: Int,
handlers: Seq[ServletContextHandler],
conf: SparkConf): ServerInfo = {
conf: SparkConf,
serverName: String = ""): ServerInfo = {

val collection = new ContextHandlerCollection
collection.setHandlers(handlers.toArray)
addFilters(handlers, conf)

@tailrec
// Bind to the given port, or throw a java.net.BindException if the port is occupied
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(hostName, currentPort))
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
server.setHandler(collection)

Try {
try {
server.start()
} match {
case s: Success[_] =>
(server, server.getConnectors.head.getLocalPort)
case f: Failure[_] =>
val nextPort = (currentPort + 1) % 65536
(server, server.getConnectors.head.getLocalPort)
} catch {
case e: Exception =>
server.stop()
pool.stop()
val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."
if (f.toString.contains("Address already in use")) {
logWarning(s"$msg - $f")
} else {
logError(msg, f.exception)
}
connect(nextPort)
throw e
}
}

val (server, boundPort) = connect(port)
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName)
ServerInfo(server, boundPort, collection)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[spark] class SparkUI(
val listenerBus: SparkListenerBus,
var appName: String,
val basePath: String = "")
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath)
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
with Logging {

def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ private[spark] abstract class WebUI(
securityManager: SecurityManager,
port: Int,
conf: SparkConf,
basePath: String = "")
basePath: String = "",
name: String = "")
extends Logging {

protected val tabs = ArrayBuffer[WebUITab]()
Expand Down Expand Up @@ -97,7 +98,7 @@ private[spark] abstract class WebUI(
def bind() {
assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
} catch {
case e: Exception =>
Expand Down
Loading