diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index edc3889c9ae51..677c5e0f89d72 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -24,6 +24,7 @@ import com.google.common.io.Files import org.apache.spark.util.Utils private[spark] class HttpFileServer( + conf: SparkConf, securityManager: SecurityManager, requestedPort: Int = 0) extends Logging { @@ -41,7 +42,7 @@ private[spark] class HttpFileServer( fileDir.mkdir() jarDir.mkdir() logInfo("HTTP File server directory is " + baseDir) - httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server") + httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server") httpServer.start() serverUri = httpServer.uri logDebug("HTTP file server started at: " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 912558d0cab7d..fa22787ce7ea3 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -42,6 +42,7 @@ private[spark] class ServerStateException(message: String) extends Exception(mes * around a Jetty server. */ private[spark] class HttpServer( + conf: SparkConf, resourceBase: File, securityManager: SecurityManager, requestedPort: Int = 0, @@ -57,7 +58,7 @@ private[spark] class HttpServer( } else { logInfo("Starting HTTP Server") val (actualServer, actualPort) = - Utils.startServiceOnPort[Server](requestedPort, doStart, serverName) + Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName) server = actualServer port = actualPort } diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index c14764f773982..93d98db5e3f5e 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -370,7 +370,9 @@ private[spark] object SparkConf { } /** - * Return whether the given config is a Spark port config. + * Return true if the given config matches either `spark.*.port` or `spark.port.*`. */ - def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port") + def isSparkPortConf(name: String): Boolean = { + (name.startsWith("spark.") && name.endsWith(".port")) | name.startsWith("spark.port.") + } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 43436a1697000..4d418037bd33f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -312,7 +312,7 @@ object SparkEnv extends Logging { val httpFileServer = if (isDriver) { val fileServerPort = conf.getInt("spark.fileserver.port", 0) - val server = new HttpFileServer(securityManager, fileServerPort) + val server = new HttpFileServer(conf, securityManager, fileServerPort) server.initialize() conf.set("spark.fileserver.uri", server.serverUri) server diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 31f0a462f84d8..31d6958c403b3 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -153,7 +153,8 @@ private[broadcast] object HttpBroadcast extends Logging { private def createServer(conf: SparkConf) { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) val broadcastPort = conf.getInt("spark.broadcast.port", 0) - server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") + server = + new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") server.start() serverUri = server.uri logInfo("Broadcast server started at " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 3340fca08014e..03c4137ca0a81 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -174,7 +174,7 @@ private[nio] class ConnectionManager( serverChannel.socket.bind(new InetSocketAddress(port)) (serverChannel, serverChannel.socket.getLocalPort) } - Utils.startServiceOnPort[ServerSocketChannel](port, startService, name) + Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 2a27d49d2de05..88fed833f922d 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging { } } - val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName) + val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName) ServerInfo(server, boundPort, collection) } diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index db2531dc171f8..4c9b1e3c46f0f 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging { val startService: Int => (ActorSystem, Int) = { actualPort => doCreateActorSystem(name, host, actualPort, conf, securityManager) } - Utils.startServiceOnPort(port, startService, name) + Utils.startServiceOnPort(port, startService, conf, name) } private def doCreateActorSystem( diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c4f1898a2db15..0c21c3d1b8a4b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1692,15 +1692,13 @@ private[spark] object Utils extends Logging { /** * Default maximum number of retries when binding to a port before giving up. */ - val portMaxRetries: Int = { - if (sys.props.contains("spark.testing")) { + def portMaxRetries(conf: SparkConf): Int = { + val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt) + if (conf.contains("spark.testing")) { // Set a higher number of retries for tests... - sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100) + maxRetries.getOrElse(100) } else { - Option(SparkEnv.get) - .flatMap(_.conf.getOption("spark.port.maxRetries")) - .map(_.toInt) - .getOrElse(16) + maxRetries.getOrElse(16) } } @@ -1709,17 +1707,18 @@ private[spark] object Utils extends Logging { * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). * * @param startPort The initial port to start the service on. - * @param maxRetries Maximum number of retries to attempt. - * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. + * @param conf A SparkConf used to get the maximum number of retries when binding to a port. + * @param serviceName Name of the service. */ def startServiceOnPort[T]( startPort: Int, startService: Int => (T, Int), - serviceName: String = "", - maxRetries: Int = portMaxRetries): (T, Int) = { + conf: SparkConf, + serviceName: String = ""): (T, Int) = { val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + val maxRetries = portMaxRetries(conf) for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) { diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 13943ed5442b9..f333e3891b5f0 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -80,7 +80,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) - })._2 + }, conf)._2 } /** Setup and start the streaming context */ diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 30727dfa64437..fe53a29cba0c9 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -32,6 +32,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.SparkConf import org.apache.spark.util.Utils class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { @@ -106,7 +107,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) - })._2 + }, new SparkConf())._2 } def publishData(data: String): Unit = { diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 646c68e60c2e9..b646f0b6f0868 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -106,7 +106,7 @@ import org.apache.spark.util.Utils val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles /** Jetty server that will serve our classes to worker nodes */ val classServerPort = conf.getInt("spark.replClassServer.port", 0) - val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") + val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") private var currentSettings: Settings = initialSettings var printResults = true // whether to print result lines var totalSilence = false // whether to print anything diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 5e93a71995072..69e44d4f916e1 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -32,7 +32,7 @@ object Main extends Logging { val s = new Settings() s.processArguments(List("-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-Yrepl-sync"), true) - val classServer = new HttpServer(outputDir, new SecurityManager(conf)) + val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf)) var sparkContext: SparkContext = _ var interp = new SparkILoop // this is a public var because tests reset it. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ebf5616e8d303..c537da9f67552 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -148,12 +148,9 @@ class ExecutorRunnable( // registers with the Scheduler and transfers the spark configs. Since the Executor backend // uses Akka to connect to the scheduler, the akka settings are needed as well as the // authentication settings. - sparkConf.getAll. - filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. - foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } - - sparkConf.getAkkaConf. - foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } + sparkConf.getAll + .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) } + .foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out.