-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] SPARK-2157 Ability to write tight firewall rules for Spark #1107
Changes from all commits
1c0981a
49ee29b
f34115d
17c79bb
b80d2fd
c5a0568
cad16da
066dc7a
5d84e0e
9e4ad96
24a4c32
0347aef
7c5bdc4
038a579
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ package org.apache.spark | |
|
||
import java.io.File | ||
|
||
import org.apache.spark.network.PortManager | ||
import org.eclipse.jetty.util.security.{Constraint, Password} | ||
import org.eclipse.jetty.security.authentication.DigestAuthenticator | ||
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler} | ||
|
@@ -41,45 +42,54 @@ private[spark] class ServerStateException(message: String) extends Exception(mes | |
* as well as classes created by the interpreter when the user types in code. This is just a wrapper | ||
* around a Jetty server. | ||
*/ | ||
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager) | ||
extends Logging { | ||
private[spark] class HttpServer(resourceBase: File, | ||
securityManager: SecurityManager, | ||
localPort: Int = 0) extends Logging { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could this be called |
||
private var server: Server = null | ||
private var port: Int = -1 | ||
private var port: Int = localPort | ||
|
||
private def startOnPort(startPort: Int): (Server, Int) = { | ||
val server = new Server() | ||
val connector = new SocketConnector | ||
connector.setMaxIdleTime(60*1000) | ||
connector.setSoLingerTime(-1) | ||
connector.setPort(startPort) | ||
server.addConnector(connector) | ||
|
||
val threadPool = new QueuedThreadPool | ||
threadPool.setDaemon(true) | ||
server.setThreadPool(threadPool) | ||
val resHandler = new ResourceHandler | ||
resHandler.setResourceBase(resourceBase.getAbsolutePath) | ||
|
||
val handlerList = new HandlerList | ||
handlerList.setHandlers(Array(resHandler, new DefaultHandler)) | ||
|
||
if (securityManager.isAuthenticationEnabled()) { | ||
logDebug("HttpServer is using security") | ||
val sh = setupSecurityHandler(securityManager) | ||
// make sure we go through security handler to get resources | ||
sh.setHandler(handlerList) | ||
server.setHandler(sh) | ||
} else { | ||
logDebug("HttpServer is not using security") | ||
server.setHandler(handlerList) | ||
} | ||
|
||
server.start() | ||
val actualPort = server.getConnectors()(0).getLocalPort() | ||
|
||
(server, actualPort) | ||
} | ||
|
||
def start() { | ||
if (server != null) { | ||
throw new ServerStateException("Server is already started") | ||
} else { | ||
logInfo("Starting HTTP Server") | ||
server = new Server() | ||
val connector = new SocketConnector | ||
connector.setMaxIdleTime(60*1000) | ||
connector.setSoLingerTime(-1) | ||
connector.setPort(0) | ||
server.addConnector(connector) | ||
|
||
val threadPool = new QueuedThreadPool | ||
threadPool.setDaemon(true) | ||
server.setThreadPool(threadPool) | ||
val resHandler = new ResourceHandler | ||
resHandler.setResourceBase(resourceBase.getAbsolutePath) | ||
|
||
val handlerList = new HandlerList | ||
handlerList.setHandlers(Array(resHandler, new DefaultHandler)) | ||
|
||
if (securityManager.isAuthenticationEnabled()) { | ||
logDebug("HttpServer is using security") | ||
val sh = setupSecurityHandler(securityManager) | ||
// make sure we go through security handler to get resources | ||
sh.setHandler(handlerList) | ||
server.setHandler(sh) | ||
} else { | ||
logDebug("HttpServer is not using security") | ||
server.setHandler(handlerList) | ||
} | ||
|
||
server.start() | ||
port = server.getConnectors()(0).getLocalPort() | ||
val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be a good idea to modify There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you could just add an argument:
|
||
server = actualServer | ||
port = actualPort | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging { | |
|
||
private def createServer(conf: SparkConf) { | ||
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) | ||
server = new HttpServer(broadcastDir, securityManager) | ||
val broadcastListenPort: Int = conf.getInt("spark.broadcast.port", 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the type declaration is not necessary here |
||
server = new HttpServer(broadcastDir, securityManager, broadcastListenPort) | ||
server.start() | ||
serverUri = server.uri | ||
logInfo("Broadcast server started at " + serverUri) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.network | ||
|
||
import java.net.InetSocketAddress | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This and |
||
|
||
import org.apache.spark.{Logging, SparkException} | ||
import org.eclipse.jetty.server.Server | ||
|
||
private[spark] object PortManager extends Logging | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just put this function in |
||
{ | ||
|
||
/** | ||
* Start service on given port, or attempt to fall back to the n+1 port for a certain number of | ||
* retries | ||
* | ||
* @param startPort | ||
* @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4 | ||
* total attempts, on ports n, n+1, n+2, and n+3 | ||
* @param startService Function to start service on a given port. Expected to throw a java.net | ||
* .BindException if the port is already in use | ||
* @tparam T | ||
* @throws SparkException When unable to start service in the given number of attempts | ||
* @return | ||
*/ | ||
def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => (T, Int)): | ||
(T, Int) = { | ||
for( offset <- 0 to maxRetries) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
val tryPort = startPort + offset | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this needs to be mod 65536 or else you could go outside of the addressable port range. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, once it passes 65535 it needs to keep trying at 1024, right? below that ports requires superuser access to open. |
||
try { | ||
return startService(tryPort) | ||
} catch { | ||
case e: java.net.BindException => { | ||
if (!e.getMessage.contains("Address already in use") || | ||
offset == (maxRetries-1)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be pulled into the previous line |
||
throw e | ||
} | ||
logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1)) | ||
} | ||
case e: Exception => throw e | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't do anything I believe |
||
} | ||
} | ||
throw new SparkException(s"Couldn't start service on port $startPort") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should go down with the other spark import
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports