Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] SPARK-2157 Ability to write tight firewall rules for Spark #1107

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
var httpServer : HttpServer = null
var serverUri : String = null

def initialize() {
def initialize(port: Option[Int]) {
baseDir = Utils.createTempDir()
fileDir = new File(baseDir, "files")
jarDir = new File(baseDir, "jars")
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(baseDir, securityManager)
httpServer = if (port.isEmpty) {
new HttpServer(baseDir, securityManager)
} else {
new HttpServer(baseDir, securityManager, port.get)
}
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
Expand Down
74 changes: 42 additions & 32 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.File

import org.apache.spark.network.PortManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
Expand All @@ -41,45 +42,54 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
extends Logging {
private[spark] class HttpServer(resourceBase: File,
securityManager: SecurityManager,
localPort: Int = 0) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be called requestedPort to make it a bit more clear?

private var server: Server = null
private var port: Int = -1
private var port: Int = localPort

private def startOnPort(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
val actualPort = server.getConnectors()(0).getLocalPort()

(server, actualPort)
}

def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(0)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
port = server.getConnectors()(0).getLocalPort()
val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a good idea to modify startWithIncrements to accept the name of the service being started? I'm a bit worried about the case where the user tries to lock down all the ports, but there is contention. Instead of failing there Spark will just log a WARN message, so at least we should say what component is contended in the WARN message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could just add an argument:

def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => (T, Int), serviceName: String):

server = actualServer
port = actualPort
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ object SparkEnv extends Logging {
val httpFileServer =
if (isDriver) {
val server = new HttpFileServer(securityManager)
server.initialize()
server.initialize(conf.getOption("spark.fileserver.port").map(_.toInt))
conf.set("spark.fileserver.uri", server.serverUri)
server
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {

private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
server = new HttpServer(broadcastDir, securityManager)
val broadcastListenPort: Int = conf.getInt("spark.broadcast.port", 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type declaration is not necessary here

server = new HttpServer(broadcastDir, securityManager, broadcastListenPort)
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
serverChannel.socket.setReuseAddress(true)
serverChannel.socket.setReceiveBufferSize(256 * 1024)

serverChannel.socket.bind(new InetSocketAddress(port))
private def startService(port: Int) = {
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, port)
}
PortManager.startWithIncrements(port, 3, startService)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
60 changes: 60 additions & 0 deletions core/src/main/scala/org/apache/spark/network/PortManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

import java.net.InetSocketAddress
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and Server below are un-used imports


import org.apache.spark.{Logging, SparkException}
import org.eclipse.jetty.server.Server

private[spark] object PortManager extends Logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just put this function in Utils? Since there isn't any state here, I don't think we need a separate PortManager class.

{

/**
* Start service on given port, or attempt to fall back to the n+1 port for a certain number of
* retries
*
* @param startPort
* @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4
* total attempts, on ports n, n+1, n+2, and n+3
* @param startService Function to start service on a given port. Expected to throw a java.net
* .BindException if the port is already in use
* @tparam T
* @throws SparkException When unable to start service in the given number of attempts
* @return
*/
def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => (T, Int)):
(T, Int) = {
for( offset <- 0 to maxRetries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (offset <- 0 to maxRetries)

val tryPort = startPort + offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be mod 65536 or else you could go outside of the addressable port range.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, once it passes 65535 it needs to keep trying at 1024, right? below that ports requires superuser access to open.

try {
return startService(tryPort)
} catch {
case e: java.net.BindException => {
if (!e.getMessage.contains("Address already in use") ||
offset == (maxRetries-1)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be pulled into the previous line

throw e
}
logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1))
}
case e: Exception => throw e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't do anything I believe

}
}
throw new SparkException(s"Couldn't start service on port $startPort")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ private[spark] class BlockManager(
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0, conf, securityManager)
val connectionManager = new ConnectionManager(conf.getInt("spark.blockManager.port", 0), conf,
securityManager)

implicit val futureExecContext = connectionManager.futureExecContext

Expand Down
41 changes: 30 additions & 11 deletions docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ configure those ports.
</tr>
<tr>
<td>Browser</td>
<td>Driver</td>
<td>Application</td>
<td>4040</td>
<td>Web UI</td>
<td><code>spark.ui.port</code></td>
Expand Down Expand Up @@ -372,18 +372,37 @@ configure those ports.

<!-- Other misc stuff -->
<tr>
<td>Driver and other Workers</td>
<td>Worker</td>
<td>Application</td>
<td>(random)</td>
<td>
<ul>
<li>File server for file and jars</li>
<li>Http Broadcast</li>
<li>Class file server (Spark Shell only)</li>
</ul>
</td>
<td>None</td>
<td>Jetty-based. Each of these services starts on a random port that cannot be configured</td>
<td>File server for files and jars</td>
<td><code>spark.fileserver.port</code></td>
<td>Jetty-based</td>
</tr>
<tr>
<td>Worker</td>
<td>Application</td>
<td>(random)</td>
<td>HTTP Broadcast</td>
<td><code>spark.broadcast.port</code></td>
<td>Jetty-based. Not used by TorrentBroadcast, which sends data through the block manager
instead</td>
</tr>
<tr>
<td>Worker</td>
<td>Spark Shell</td>
<td>(random)</td>
<td>Class file server (Spark Shell only)</td>
<td><code>spark.replClassServer.port</code></td>
<td>Jetty-based</td>
</tr>
<tr>
<td>Worker</td>
<td>Other Workers</td>
<td>(random)</td>
<td>Block Manager port</td>
<td><code>spark.blockManager.port</code></td>
<td>Raw socket via ServerSocketChannel</td>
</tr>

</table>
Expand Down
3 changes: 2 additions & 1 deletion repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ import org.apache.spark.util.Utils

val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles
/** Jetty server that will serve our classes to worker nodes */
val classServer = new HttpServer(outputDir, new SecurityManager(conf))
val classServerListenPort = conf.getInt("spark.replClassServer.port", 0)
val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerListenPort)
private var currentSettings: Settings = initialSettings
var printResults = true // whether to print result lines
var totalSilence = false // whether to print anything
Expand Down