From 1c0981a3f93c3bbb27425d76342c98c8c7d469cf Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 16 Jun 2014 22:09:59 -0700 Subject: [PATCH 01/39] Make port in HttpServer configurable --- core/src/main/scala/org/apache/spark/HttpServer.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 7e9b517f901a2..deebf19d1a462 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -41,10 +41,11 @@ private[spark] class ServerStateException(message: String) extends Exception(mes * as well as classes created by the interpreter when the user types in code. This is just a wrapper * around a Jetty server. */ -private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager) - extends Logging { +private[spark] class HttpServer(resourceBase: File, + securityManager: SecurityManager, + localPort: Int = 0) extends Logging { private var server: Server = null - private var port: Int = -1 + private var port: Int = localPort def start() { if (server != null) { @@ -55,7 +56,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan val connector = new SocketConnector connector.setMaxIdleTime(60*1000) connector.setSoLingerTime(-1) - connector.setPort(0) + connector.setPort(localPort) server.addConnector(connector) val threadPool = new QueuedThreadPool From 49ee29b49e1275b48d18aef5182dba2937c11358 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 16 Jun 2014 22:31:10 -0700 Subject: [PATCH 02/39] SPARK-1174 Add port configuration for HttpFileServer Uses spark.fileserver.port --- core/src/main/scala/org/apache/spark/HttpFileServer.scala | 8 ++++++-- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index 0e3750fdde415..f575a0d65e80d 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -31,14 +31,18 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo var httpServer : HttpServer = null var serverUri : String = null - def initialize() { + def initialize(port: Option[Int]) { baseDir = Utils.createTempDir() fileDir = new File(baseDir, "files") jarDir = new File(baseDir, "jars") fileDir.mkdir() jarDir.mkdir() logInfo("HTTP File server directory is " + baseDir) - httpServer = new HttpServer(baseDir, securityManager) + httpServer = if (port.isEmpty) { + new HttpServer(baseDir, securityManager) + } else { + new HttpServer(baseDir, securityManager, port.get) + } httpServer.start() serverUri = httpServer.uri logDebug("HTTP file server started at: " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 8f70744d804d9..6e272f7ff2211 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -231,7 +231,7 @@ object SparkEnv extends Logging { val httpFileServer = if (isDriver) { val server = new HttpFileServer(securityManager) - server.initialize() + server.initialize(conf.getOption("spark.fileserver.port").map(_.toInt)) conf.set("spark.fileserver.uri", server.serverUri) server } else { From f34115d59b83163d9542be09eb0c89a87ea89309 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 16 Jun 2014 22:31:52 -0700 Subject: [PATCH 03/39] SPARK-1176 Add port configuration for HttpBroadcast Uses spark.broadcast.port --- .../main/scala/org/apache/spark/broadcast/HttpBroadcast.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 487456467b23b..eaf3b69a7c5a6 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging { private def createServer(conf: SparkConf) { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) - server = new HttpServer(broadcastDir, securityManager) + val broadcastListenPort: Int = conf.getInt("spark.broadcast.port", 0) + server = new HttpServer(broadcastDir, securityManager, broadcastListenPort) server.start() serverUri = server.uri logInfo("Broadcast server started at " + serverUri) From 17c79bbd66708d24c093be5f43e60c61f504d19d Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 16 Jun 2014 23:00:19 -0700 Subject: [PATCH 04/39] Add a configuration option for spark-shell's class server spark.replClassServer.port --- repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 3842c291d0b7b..50897a71101f1 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -102,7 +102,8 @@ import org.apache.spark.util.Utils val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles /** Jetty server that will serve our classes to worker nodes */ - val classServer = new HttpServer(outputDir, new SecurityManager(conf)) + val classServerListenPort: Int = conf.getInt("spark.replClassServer.port", 0) + val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerListenPort) private var currentSettings: Settings = initialSettings var printResults = true // whether to print result lines var totalSilence = false // whether to print anything From b80d2fd8e9b27a4d49561d31f100ffbb75393685 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 16 Jun 2014 23:40:32 -0700 Subject: [PATCH 05/39] Make Spark's block manager port configurable spark.blockManager.port --- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0db0a5bc7341b..ae55a0ace5b4f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -63,7 +63,8 @@ private[spark] class BlockManager( val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) - val connectionManager = new ConnectionManager(0, conf, securityManager) + val connectionManager = new ConnectionManager(conf.getInt("spark.blockManager.port", 0), conf, + securityManager) implicit val futureExecContext = connectionManager.futureExecContext From c5a05684ace9332077dbf63848d08f39a8b91628 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 17 Jun 2014 01:10:21 -0700 Subject: [PATCH 06/39] Fix ConnectionManager to retry with increment Fails when running master+worker+executor+shell on the same machine. I think the issue is that both the shell and the executor attempt to start a ConnectionManager, which causes port conflicts. Solution is to attempt and increment on BindExceptions --- .../spark/network/ConnectionManager.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 8a1cdb812962e..f811c3d5021b4 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -102,7 +102,24 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, serverChannel.socket.setReuseAddress(true) serverChannel.socket.setReceiveBufferSize(256 * 1024) - serverChannel.socket.bind(new InetSocketAddress(port)) + def bindWithIncrement(port: Int, maxTries: Int = 3) { + for( offset <- 0 until maxTries ) { + try { + serverChannel.socket.bind(new InetSocketAddress(port + offset)) + return + } catch { + case e: java.net.BindException => { + if(!e.getMessage.contains("Address already in use") || + offset == maxTries) { + throw e + } + logInfo("Could not bind on port: " + (port+offset)) + } + case e: Exception => throw e + } + } + } + bindWithIncrement(port, 3) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) From cad16dacb1b7dbac1122b38c2b02fe35f1303a59 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 17 Jun 2014 09:45:59 -0700 Subject: [PATCH 07/39] Add fallover increment logic for HttpServer --- .../scala/org/apache/spark/HttpServer.scala | 81 ++++++++++++------- 1 file changed, 52 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index deebf19d1a462..1a86dffe98c46 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -47,40 +47,63 @@ private[spark] class HttpServer(resourceBase: File, private var server: Server = null private var port: Int = localPort + private def startOnPort(startPort: Int) { + val server = new Server() + val connector = new SocketConnector + connector.setMaxIdleTime(60*1000) + connector.setSoLingerTime(-1) + connector.setPort(startPort) + server.addConnector(connector) + + val threadPool = new QueuedThreadPool + threadPool.setDaemon(true) + server.setThreadPool(threadPool) + val resHandler = new ResourceHandler + resHandler.setResourceBase(resourceBase.getAbsolutePath) + + val handlerList = new HandlerList + handlerList.setHandlers(Array(resHandler, new DefaultHandler)) + + if (securityManager.isAuthenticationEnabled()) { + logDebug("HttpServer is using security") + val sh = setupSecurityHandler(securityManager) + // make sure we go through security handler to get resources + sh.setHandler(handlerList) + server.setHandler(sh) + } else { + logDebug("HttpServer is not using security") + server.setHandler(handlerList) + } + + server.start() + val actualPort = server.getConnectors()(0).getLocalPort() + + return (server, actualPort) + } + + private def startWithIncrements(startPort: Int, maxTries: Int) { + for( tryPort <- startPort until (startPort+maxTries)) { + try { + val (server, actualPort) = startOnPort(startPort) + return (server, actualPort) + } catch { + case e: java.net.BindException => { + if (!e.getMessage.contains("Address already in use")) { + throw e + } + logInfo("Could not bind on port: " + (tryPort)) + } + case e: Exception => throw e + } + } + } + def start() { if (server != null) { throw new ServerStateException("Server is already started") } else { logInfo("Starting HTTP Server") - server = new Server() - val connector = new SocketConnector - connector.setMaxIdleTime(60*1000) - connector.setSoLingerTime(-1) - connector.setPort(localPort) - server.addConnector(connector) - - val threadPool = new QueuedThreadPool - threadPool.setDaemon(true) - server.setThreadPool(threadPool) - val resHandler = new ResourceHandler - resHandler.setResourceBase(resourceBase.getAbsolutePath) - - val handlerList = new HandlerList - handlerList.setHandlers(Array(resHandler, new DefaultHandler)) - - if (securityManager.isAuthenticationEnabled()) { - logDebug("HttpServer is using security") - val sh = setupSecurityHandler(securityManager) - // make sure we go through security handler to get resources - sh.setHandler(handlerList) - server.setHandler(sh) - } else { - logDebug("HttpServer is not using security") - server.setHandler(handlerList) - } - - server.start() - port = server.getConnectors()(0).getLocalPort() + (server, port) = startWithIncrements(localPort, 3) } } From 066dc7ac936cfbf268e6ca7adfa1388f5c4049d6 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 17 Jun 2014 10:08:49 -0700 Subject: [PATCH 08/39] Fix up HttpServer port increments --- .../scala/org/apache/spark/HttpServer.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 1a86dffe98c46..1b62fe8be5193 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -47,7 +47,7 @@ private[spark] class HttpServer(resourceBase: File, private var server: Server = null private var port: Int = localPort - private def startOnPort(startPort: Int) { + private def startOnPort(startPort: Int): Tuple2[Server,Int] = { val server = new Server() val connector = new SocketConnector connector.setMaxIdleTime(60*1000) @@ -81,21 +81,23 @@ private[spark] class HttpServer(resourceBase: File, return (server, actualPort) } - private def startWithIncrements(startPort: Int, maxTries: Int) { - for( tryPort <- startPort until (startPort+maxTries)) { + private def startWithIncrements(startPort: Int, maxRetries: Int): Tuple2[Server,Int] = { + for( offset <- 0 to maxRetries) { try { - val (server, actualPort) = startOnPort(startPort) + val (server, actualPort) = startOnPort(startPort+offset) return (server, actualPort) } catch { case e: java.net.BindException => { - if (!e.getMessage.contains("Address already in use")) { + if (!e.getMessage.contains("Address already in use") || + offset == (maxRetries-1)) { throw e } - logInfo("Could not bind on port: " + (tryPort)) + logInfo("Could not bind on port: " + (startPort+offset)) } case e: Exception => throw e } } + return (null, -1) } def start() { @@ -103,7 +105,9 @@ private[spark] class HttpServer(resourceBase: File, throw new ServerStateException("Server is already started") } else { logInfo("Starting HTTP Server") - (server, port) = startWithIncrements(localPort, 3) + val (actualServer, actualPort) = startWithIncrements(localPort, 3) + server = actualServer + port = actualPort } } From 5d84e0e9285aec53aa9c57d64313c0e513e41d30 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 17 Jun 2014 10:43:33 -0700 Subject: [PATCH 09/39] Document new port configuration options - spark.fileserver.port - spark.broadcast.port - spark.replClassServer.port - spark.blockManager.port --- docs/spark-standalone.md | 41 +++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index ad8b6c0e51a78..2183c54f8c672 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -322,7 +322,7 @@ configure those ports. Browser - Driver + Application 4040 Web UI spark.ui.port @@ -372,18 +372,37 @@ configure those ports. - Driver and other Workers Worker + Application (random) - -
    -
  • File server for file and jars
  • -
  • Http Broadcast
  • -
  • Class file server (Spark Shell only)
  • -
- - None - Jetty-based. Each of these services starts on a random port that cannot be configured + File server for files and jars + spark.fileserver.port + Jetty-based + + + Worker + Application + (random) + HTTP Broadcast + spark.broadcast.port + Jetty-based. Not used by TorrentBroadcast, which sends data through the block manager + instead + + + Worker + Spark Shell + (random) + Class file server (Spark Shell only) + spark.replClassServer.port + Jetty-based + + + Worker + Other Workers + (random) + Block Manager port + spark.blockManager.port + Raw socket via ServerSocketChannel From 9e4ad9628f7ff0f96a3881a1a5aaedcb8be6b80d Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 17 Jun 2014 11:14:08 -0700 Subject: [PATCH 10/39] Reformat for style checker --- core/src/main/scala/org/apache/spark/HttpServer.scala | 4 ++-- .../scala/org/apache/spark/network/ConnectionManager.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 1b62fe8be5193..491d8307c8c0b 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -84,7 +84,7 @@ private[spark] class HttpServer(resourceBase: File, private def startWithIncrements(startPort: Int, maxRetries: Int): Tuple2[Server,Int] = { for( offset <- 0 to maxRetries) { try { - val (server, actualPort) = startOnPort(startPort+offset) + val (server, actualPort) = startOnPort(startPort + offset) return (server, actualPort) } catch { case e: java.net.BindException => { @@ -92,7 +92,7 @@ private[spark] class HttpServer(resourceBase: File, offset == (maxRetries-1)) { throw e } - logInfo("Could not bind on port: " + (startPort+offset)) + logInfo("Could not bind on port: " + (startPort + offset)) } case e: Exception => throw e } diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index f811c3d5021b4..d3e575cb5ef29 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -113,7 +113,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, offset == maxTries) { throw e } - logInfo("Could not bind on port: " + (port+offset)) + logInfo("Could not bind on port: " + (port + offset)) } case e: Exception => throw e } From 24a4c327c7441e6af6b82dbddacd71c57384dc04 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Sun, 29 Jun 2014 17:25:44 -0700 Subject: [PATCH 11/39] Remove type on val to match surrounding style --- repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 50897a71101f1..c62c3eb64f6a7 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -102,7 +102,7 @@ import org.apache.spark.util.Utils val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles /** Jetty server that will serve our classes to worker nodes */ - val classServerListenPort: Int = conf.getInt("spark.replClassServer.port", 0) + val classServerListenPort = conf.getInt("spark.replClassServer.port", 0) val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerListenPort) private var currentSettings: Settings = initialSettings var printResults = true // whether to print result lines From 0347aef2b686d1bcc1b8f5c230ba8ff99cbd0691 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Sun, 29 Jun 2014 22:26:48 -0700 Subject: [PATCH 12/39] Unify port fallback logic to a single place --- .../scala/org/apache/spark/HttpServer.scala | 26 ++------ .../spark/network/ConnectionManager.scala | 21 ++----- .../apache/spark/network/PortManager.scala | 61 +++++++++++++++++++ 3 files changed, 69 insertions(+), 39 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/network/PortManager.scala diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 491d8307c8c0b..929b6380f51a9 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.File +import org.apache.spark.network.PortManager import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.security.authentication.DigestAuthenticator import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler} @@ -47,7 +48,7 @@ private[spark] class HttpServer(resourceBase: File, private var server: Server = null private var port: Int = localPort - private def startOnPort(startPort: Int): Tuple2[Server,Int] = { + private def startOnPort(startPort: Int): Server = { val server = new Server() val connector = new SocketConnector connector.setMaxIdleTime(60*1000) @@ -78,26 +79,7 @@ private[spark] class HttpServer(resourceBase: File, server.start() val actualPort = server.getConnectors()(0).getLocalPort() - return (server, actualPort) - } - - private def startWithIncrements(startPort: Int, maxRetries: Int): Tuple2[Server,Int] = { - for( offset <- 0 to maxRetries) { - try { - val (server, actualPort) = startOnPort(startPort + offset) - return (server, actualPort) - } catch { - case e: java.net.BindException => { - if (!e.getMessage.contains("Address already in use") || - offset == (maxRetries-1)) { - throw e - } - logInfo("Could not bind on port: " + (startPort + offset)) - } - case e: Exception => throw e - } - } - return (null, -1) + server } def start() { @@ -105,7 +87,7 @@ private[spark] class HttpServer(resourceBase: File, throw new ServerStateException("Server is already started") } else { logInfo("Starting HTTP Server") - val (actualServer, actualPort) = startWithIncrements(localPort, 3) + val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort) server = actualServer port = actualPort } diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index d3e575cb5ef29..e3ff67829b785 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -102,24 +102,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, serverChannel.socket.setReuseAddress(true) serverChannel.socket.setReceiveBufferSize(256 * 1024) - def bindWithIncrement(port: Int, maxTries: Int = 3) { - for( offset <- 0 until maxTries ) { - try { - serverChannel.socket.bind(new InetSocketAddress(port + offset)) - return - } catch { - case e: java.net.BindException => { - if(!e.getMessage.contains("Address already in use") || - offset == maxTries) { - throw e - } - logInfo("Could not bind on port: " + (port + offset)) - } - case e: Exception => throw e - } - } + private def startService(port: Int) = { + serverChannel.socket.bind(new InetSocketAddress(port)) + serverChannel } - bindWithIncrement(port, 3) + PortManager.startWithIncrements(port, 3, startService) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/network/PortManager.scala b/core/src/main/scala/org/apache/spark/network/PortManager.scala new file mode 100644 index 0000000000000..82cafa637f3dc --- /dev/null +++ b/core/src/main/scala/org/apache/spark/network/PortManager.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network + +import java.net.InetSocketAddress + +import org.apache.spark.{Logging, SparkException} +import org.eclipse.jetty.server.Server + +private[spark] object PortManager extends Logging +{ + + /** + * Start service on given port, or attempt to fall back to the n+1 port for a certain number of + * retries + * + * @param startPort + * @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4 + * total attempts, on ports n, n+1, n+2, and n+3 + * @param startService Function to start service on a given port. Expected to throw a java.net + * .BindException if the port is already in use + * @tparam T + * @throws SparkException When unable to start service in the given number of attempts + * @return + */ + def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => T): + (T, Int) = { + for( offset <- 0 to maxRetries) { + val tryPort = startPort + offset + try { + val service: T = startService(tryPort) + return (service, tryPort) + } catch { + case e: java.net.BindException => { + if (!e.getMessage.contains("Address already in use") || + offset == (maxRetries-1)) { + throw e + } + logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort+1)) + } + case e: Exception => throw e + } + } + throw new SparkException(s"Couldn't start service on port $startPort") + } +} From 7c5bdc44df32fb550f375de3518b628fbb360d20 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Sun, 29 Jun 2014 22:34:47 -0700 Subject: [PATCH 13/39] Fix style issue --- core/src/main/scala/org/apache/spark/network/PortManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/network/PortManager.scala b/core/src/main/scala/org/apache/spark/network/PortManager.scala index 82cafa637f3dc..47b40b0f17b28 100644 --- a/core/src/main/scala/org/apache/spark/network/PortManager.scala +++ b/core/src/main/scala/org/apache/spark/network/PortManager.scala @@ -51,7 +51,7 @@ private[spark] object PortManager extends Logging offset == (maxRetries-1)) { throw e } - logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort+1)) + logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1)) } case e: Exception => throw e } From 038a579a26ffcfc1c5540f28176f236779eef12a Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 30 Jun 2014 00:02:17 -0700 Subject: [PATCH 14/39] Trust the server start function to report the port the service started on --- core/src/main/scala/org/apache/spark/HttpServer.scala | 4 ++-- .../scala/org/apache/spark/network/ConnectionManager.scala | 2 +- .../main/scala/org/apache/spark/network/PortManager.scala | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 929b6380f51a9..e0c76feb124a2 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -48,7 +48,7 @@ private[spark] class HttpServer(resourceBase: File, private var server: Server = null private var port: Int = localPort - private def startOnPort(startPort: Int): Server = { + private def startOnPort(startPort: Int): (Server, Int) = { val server = new Server() val connector = new SocketConnector connector.setMaxIdleTime(60*1000) @@ -79,7 +79,7 @@ private[spark] class HttpServer(resourceBase: File, server.start() val actualPort = server.getConnectors()(0).getLocalPort() - server + (server, actualPort) } def start() { diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index e3ff67829b785..7e498fb78a06e 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -104,7 +104,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, private def startService(port: Int) = { serverChannel.socket.bind(new InetSocketAddress(port)) - serverChannel + (serverChannel, port) } PortManager.startWithIncrements(port, 3, startService) serverChannel.register(selector, SelectionKey.OP_ACCEPT) diff --git a/core/src/main/scala/org/apache/spark/network/PortManager.scala b/core/src/main/scala/org/apache/spark/network/PortManager.scala index 47b40b0f17b28..f9ad9629fe80a 100644 --- a/core/src/main/scala/org/apache/spark/network/PortManager.scala +++ b/core/src/main/scala/org/apache/spark/network/PortManager.scala @@ -38,13 +38,12 @@ private[spark] object PortManager extends Logging * @throws SparkException When unable to start service in the given number of attempts * @return */ - def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => T): + def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => (T, Int)): (T, Int) = { for( offset <- 0 to maxRetries) { val tryPort = startPort + offset try { - val service: T = startService(tryPort) - return (service, tryPort) + return startService(tryPort) } catch { case e: java.net.BindException => { if (!e.getMessage.contains("Address already in use") || From 73fbe892794a6f7e4a051401f356c89f4aa7f81f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 15:39:01 -0700 Subject: [PATCH 15/39] Move start service logic to Utils --- .../scala/org/apache/spark/HttpServer.scala | 5 +- .../spark/network/ConnectionManager.scala | 2 +- .../apache/spark/network/PortManager.scala | 60 ------------------- .../org/apache/spark/ui/JettyUtils.scala | 4 +- .../scala/org/apache/spark/util/Utils.scala | 34 ++++++++++- 5 files changed, 38 insertions(+), 67 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/network/PortManager.scala diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index e0c76feb124a2..3883a9cb71f40 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -19,10 +19,9 @@ package org.apache.spark import java.io.File -import org.apache.spark.network.PortManager import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.security.authentication.DigestAuthenticator -import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler} +import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService} import org.eclipse.jetty.server.Server import org.eclipse.jetty.server.bio.SocketConnector @@ -87,7 +86,7 @@ private[spark] class HttpServer(resourceBase: File, throw new ServerStateException("Server is already started") } else { logInfo("Starting HTTP Server") - val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort) + val (actualServer, actualPort) = Utils.startServiceOnPort(localPort, 3, startOnPort) server = actualServer port = actualPort } diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index e624414861b1d..382e6362cf953 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -109,7 +109,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, serverChannel.socket.bind(new InetSocketAddress(port)) (serverChannel, port) } - PortManager.startWithIncrements(port, 3, startService) + Utils.startServiceOnPort(port, 3, startService) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/network/PortManager.scala b/core/src/main/scala/org/apache/spark/network/PortManager.scala deleted file mode 100644 index f9ad9629fe80a..0000000000000 --- a/core/src/main/scala/org/apache/spark/network/PortManager.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network - -import java.net.InetSocketAddress - -import org.apache.spark.{Logging, SparkException} -import org.eclipse.jetty.server.Server - -private[spark] object PortManager extends Logging -{ - - /** - * Start service on given port, or attempt to fall back to the n+1 port for a certain number of - * retries - * - * @param startPort - * @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4 - * total attempts, on ports n, n+1, n+2, and n+3 - * @param startService Function to start service on a given port. Expected to throw a java.net - * .BindException if the port is already in use - * @tparam T - * @throws SparkException When unable to start service in the given number of attempts - * @return - */ - def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => (T, Int)): - (T, Int) = { - for( offset <- 0 to maxRetries) { - val tryPort = startPort + offset - try { - return startService(tryPort) - } catch { - case e: java.net.BindException => { - if (!e.getMessage.contains("Address already in use") || - offset == (maxRetries-1)) { - throw e - } - logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1)) - } - case e: Exception => throw e - } - } - throw new SparkException(s"Couldn't start service on port $startPort") - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a2535e3c1c41f..a4be870757250 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui -import java.net.{InetSocketAddress, URL} +import java.net.{BindException, InetSocketAddress, URL} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} @@ -33,7 +33,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.util.Utils /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 30073a82857d2..844675eeddf09 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,7 +18,7 @@ package org.apache.spark.util import java.io._ -import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection} +import java.net._ import java.nio.ByteBuffer import java.util.{Locale, Random, UUID} import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} @@ -1331,4 +1331,36 @@ private[spark] object Utils extends Logging { .map { case (k, v) => s"-D$k=$v" } } + /** + * Attempt to start a service on the given port, or fail after a number of attempts. + * Each subsequent attempt uses 1 + the port used in the previous attempt. + * + * @param startPort The initial port to start the service on. + * @param maxRetries Maximum number of retries to attempt. + * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. + * @param startService Function to start service on a given port. + * This is expected to throw java.net.BindException on port collision. + * @throws SparkException When unable to start service in the given number of attempts + * @return + */ + def startServiceOnPort[T]( + startPort: Int, + maxRetries: Int, + startService: Int => (T, Int)): (T, Int) = { + for (offset <- 0 to maxRetries) { + val tryPort = (startPort + offset) % 65536 + try { + return startService(tryPort) + } catch { + case e: BindException => + if (!e.getMessage.contains("Address already in use") || offset >= maxRetries) { + throw e + } + logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1)) + } + } + // Should never happen + throw new SparkException(s"Couldn't start service on port $startPort") + } + } From 6b550b0681ae8c0394685f6e929c4a14a48d10ec Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 16:56:17 -0700 Subject: [PATCH 16/39] Assorted fixes --- .../org/apache/spark/HttpFileServer.scala | 13 +++--- .../scala/org/apache/spark/HttpServer.scala | 45 +++++++++++-------- .../scala/org/apache/spark/SparkEnv.scala | 5 ++- .../spark/broadcast/HttpBroadcast.scala | 4 +- .../spark/network/ConnectionManager.scala | 11 +++-- .../apache/spark/storage/BlockManager.scala | 5 ++- .../scala/org/apache/spark/util/Utils.scala | 17 ++++--- .../org/apache/spark/repl/SparkIMain.scala | 4 +- 8 files changed, 62 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index f575a0d65e80d..edc3889c9ae51 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -23,7 +23,10 @@ import com.google.common.io.Files import org.apache.spark.util.Utils -private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging { +private[spark] class HttpFileServer( + securityManager: SecurityManager, + requestedPort: Int = 0) + extends Logging { var baseDir : File = null var fileDir : File = null @@ -31,18 +34,14 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo var httpServer : HttpServer = null var serverUri : String = null - def initialize(port: Option[Int]) { + def initialize() { baseDir = Utils.createTempDir() fileDir = new File(baseDir, "files") jarDir = new File(baseDir, "jars") fileDir.mkdir() jarDir.mkdir() logInfo("HTTP File server directory is " + baseDir) - httpServer = if (port.isEmpty) { - new HttpServer(baseDir, securityManager) - } else { - new HttpServer(baseDir, securityManager, port.get) - } + httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server") httpServer.start() serverUri = httpServer.uri logDebug("HTTP file server started at: " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 3883a9cb71f40..9ced0b87a5187 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -41,13 +41,33 @@ private[spark] class ServerStateException(message: String) extends Exception(mes * as well as classes created by the interpreter when the user types in code. This is just a wrapper * around a Jetty server. */ -private[spark] class HttpServer(resourceBase: File, - securityManager: SecurityManager, - localPort: Int = 0) extends Logging { +private[spark] class HttpServer( + resourceBase: File, + securityManager: SecurityManager, + requestedPort: Int = 0, + serverName: String = "HTTP server") extends Logging { private var server: Server = null - private var port: Int = localPort + private var port: Int = requestedPort - private def startOnPort(startPort: Int): (Server, Int) = { + def start() { + if (server != null) { + throw new ServerStateException("Server is already started") + } else { + logInfo("Starting HTTP Server") + val (actualServer, actualPort) = + Utils.startServiceOnPort[Server](requestedPort, doStart, serverName) + server = actualServer + port = actualPort + } + } + + /** + * Actually start the HTTP server on the given port. + * + * Note that this is only best effort in the sense that we may end up binding to a nearby port + * in the event of port collision. Return the bound server and the actual port used. + */ + private def doStart(startPort: Int): (Server, Int) = { val server = new Server() val connector = new SocketConnector connector.setMaxIdleTime(60*1000) @@ -76,22 +96,11 @@ private[spark] class HttpServer(resourceBase: File, } server.start() - val actualPort = server.getConnectors()(0).getLocalPort() + val actualPort = server.getConnectors()(0).getLocalPort (server, actualPort) } - def start() { - if (server != null) { - throw new ServerStateException("Server is already started") - } else { - logInfo("Starting HTTP Server") - val (actualServer, actualPort) = Utils.startServiceOnPort(localPort, 3, startOnPort) - server = actualServer - port = actualPort - } - } - /** * Setup Jetty to the HashLoginService using a single user with our * shared secret. Configure it to use DIGEST-MD5 authentication so that the password @@ -143,7 +152,7 @@ private[spark] class HttpServer(resourceBase: File, if (server == null) { throw new ServerStateException("Server is not started") } else { - return "http://" + Utils.localIpAddress + ":" + port + "http://" + Utils.localIpAddress + ":" + port } } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ab0e1d6c3ba93..051814f074c80 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -225,8 +225,9 @@ object SparkEnv extends Logging { val httpFileServer = if (isDriver) { - val server = new HttpFileServer(securityManager) - server.initialize(conf.getOption("spark.fileserver.port").map(_.toInt)) + val fileServerPort = conf.getInt("spark.fileserver.port", 0) + val server = new HttpFileServer(securityManager, fileServerPort) + server.initialize() conf.set("spark.fileserver.uri", server.serverUri) server } else { diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index eaf3b69a7c5a6..942dc7d7eac87 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -152,8 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging { private def createServer(conf: SparkConf) { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) - val broadcastListenPort: Int = conf.getInt("spark.broadcast.port", 0) - server = new HttpServer(broadcastDir, securityManager, broadcastListenPort) + val broadcastPort = conf.getInt("spark.broadcast.port", 0) + server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") server.start() serverUri = server.uri logInfo("Broadcast server started at " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 382e6362cf953..a913580d8f6c6 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -38,8 +38,11 @@ import scala.language.postfixOps import org.apache.spark._ import org.apache.spark.util.{SystemClock, Utils} -private[spark] class ConnectionManager(port: Int, conf: SparkConf, - securityManager: SecurityManager) extends Logging { +private[spark] class ConnectionManager( + port: Int, + conf: SparkConf, + securityManager: SecurityManager, + name: String = "Connection manager") extends Logging { class MessageStatus( val message: Message, @@ -105,11 +108,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, serverChannel.socket.setReuseAddress(true) serverChannel.socket.setReceiveBufferSize(256 * 1024) - private def startService(port: Int) = { + private def startService(port: Int): (ServerSocketChannel, Int) = { serverChannel.socket.bind(new InetSocketAddress(port)) (serverChannel, port) } - Utils.startServiceOnPort(port, 3, startService) + Utils.startServiceOnPort[ServerSocketChannel](port, startService, name) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f8330eb49c0c1..3876cf43e2a7d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -60,11 +60,12 @@ private[spark] class BlockManager( mapOutputTracker: MapOutputTracker) extends Logging { + private val port = conf.getInt("spark.blockManager.port", 0) val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) - val connectionManager = new ConnectionManager(conf.getInt("spark.blockManager.port", 0), conf, - securityManager) + val connectionManager = + new ConnectionManager(port, conf, securityManager, "Connection manager for block manager") implicit val futureExecContext = connectionManager.futureExecContext diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 844675eeddf09..042bc5b98fa0d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1345,22 +1345,29 @@ private[spark] object Utils extends Logging { */ def startServiceOnPort[T]( startPort: Int, - maxRetries: Int, - startService: Int => (T, Int)): (T, Int) = { + startService: Int => (T, Int), + serviceName: String = "", + maxRetries: Int = 3): (T, Int) = { for (offset <- 0 to maxRetries) { val tryPort = (startPort + offset) % 65536 try { return startService(tryPort) } catch { case e: BindException => + val service = if (serviceName.isEmpty) "Service" else s"Service '$serviceName'" if (!e.getMessage.contains("Address already in use") || offset >= maxRetries) { - throw e + val exceptionMessage = + s"${e.getMessage}: $service failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception } - logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1)) + logInfo(s"$service could not bind on port $tryPort. Attempting port ${tryPort + 1}.") } } // Should never happen - throw new SparkException(s"Couldn't start service on port $startPort") + throw new SparkException(s"Failed to start service on port $startPort") } } diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 89b64df320498..84b57cd2dc1af 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -102,8 +102,8 @@ import org.apache.spark.util.Utils val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles /** Jetty server that will serve our classes to worker nodes */ - val classServerListenPort = conf.getInt("spark.replClassServer.port", 0) - val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerListenPort) + val classServerPort = conf.getInt("spark.replClassServer.port", 0) + val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") private var currentSettings: Settings = initialSettings var printResults = true // whether to print result lines var totalSilence = false // whether to print anything From ba322807d2e5ed1ce69dae449238a1df16a74ae9 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 17:00:31 -0700 Subject: [PATCH 17/39] Minor fixes --- core/src/main/scala/org/apache/spark/HttpServer.scala | 4 +++- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 4 ++-- core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 9ced0b87a5187..ee018ac390f6e 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -45,7 +45,9 @@ private[spark] class HttpServer( resourceBase: File, securityManager: SecurityManager, requestedPort: Int = 0, - serverName: String = "HTTP server") extends Logging { + serverName: String = "HTTP server") + extends Logging { + private var server: Server = null private var port: Int = requestedPort diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a4be870757250..a2535e3c1c41f 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui -import java.net.{BindException, InetSocketAddress, URL} +import java.net.{InetSocketAddress, URL} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} @@ -33,7 +33,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.Utils /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 042bc5b98fa0d..f955f5bc59422 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1340,8 +1340,7 @@ private[spark] object Utils extends Logging { * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. - * @throws SparkException When unable to start service in the given number of attempts - * @return + * @throws SparkException When unable to start the service after a given number of attempts */ def startServiceOnPort[T]( startPort: Int, From 1d7e40813e6ae98ee5cffb3e9e61807f3a01e941 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 17:40:27 -0700 Subject: [PATCH 18/39] Treat 0 ports specially + return correct ConnectionManager port --- .../spark/network/ConnectionManager.scala | 2 +- .../org/apache/spark/ui/JettyUtils.scala | 27 +++++++------------ .../scala/org/apache/spark/util/Utils.scala | 14 ++++++---- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index a913580d8f6c6..f837474b0c6ef 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -110,7 +110,7 @@ private[spark] class ConnectionManager( private def startService(port: Int): (ServerSocketChannel, Int) = { serverChannel.socket.bind(new InetSocketAddress(port)) - (serverChannel, port) + (serverChannel, serverChannel.socket.getLocalPort) } Utils.startServiceOnPort[ServerSocketChannel](port, startService, name) serverChannel.register(selector, SelectionKey.OP_ACCEPT) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a2535e3c1c41f..220b1bf234aaf 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -174,40 +174,33 @@ private[spark] object JettyUtils extends Logging { hostName: String, port: Int, handlers: Seq[ServletContextHandler], - conf: SparkConf): ServerInfo = { + conf: SparkConf, + serverName: String = ""): ServerInfo = { val collection = new ContextHandlerCollection collection.setHandlers(handlers.toArray) addFilters(handlers, conf) - @tailrec + // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { val server = new Server(new InetSocketAddress(hostName, currentPort)) val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) server.setHandler(collection) - - Try { + try { server.start() - } match { - case s: Success[_] => - (server, server.getConnectors.head.getLocalPort) - case f: Failure[_] => - val nextPort = (currentPort + 1) % 65536 + (server, server.getConnectors.head.getLocalPort) + } catch { + case e: Exception => server.stop() pool.stop() - val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort." - if (f.toString.contains("Address already in use")) { - logWarning(s"$msg - $f") - } else { - logError(msg, f.exception) - } - connect(nextPort) + throw e } } - val (server, boundPort) = connect(port) + val (server, boundPort) = + Utils.startServiceOnPort[Server](port, connect, serverName, maxRetries = 10) ServerInfo(server, boundPort, collection) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f955f5bc59422..f091e5ace1022 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1347,22 +1347,26 @@ private[spark] object Utils extends Logging { startService: Int => (T, Int), serviceName: String = "", maxRetries: Int = 3): (T, Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" for (offset <- 0 to maxRetries) { - val tryPort = (startPort + offset) % 65536 + // Do not increment port if startPort is 0, which is treated as a special port + val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536 try { - return startService(tryPort) + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) } catch { case e: BindException => - val service = if (serviceName.isEmpty) "Service" else s"Service '$serviceName'" if (!e.getMessage.contains("Address already in use") || offset >= maxRetries) { val exceptionMessage = - s"${e.getMessage}: $service failed after $maxRetries retries!" + s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" val exception = new BindException(exceptionMessage) // restore original stack trace exception.setStackTrace(e.getStackTrace) throw exception } - logInfo(s"$service could not bind on port $tryPort. Attempting port ${tryPort + 1}.") + logWarning(s"Service$serviceString could not bind on port $tryPort. " + + s"Attempting port ${tryPort + 1}.") } } // Should never happen From 470f38cf3c54941fbbcc358a358cc8a1fe2d6edd Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 17:43:24 -0700 Subject: [PATCH 19/39] Special case non-"Address already in use" exceptions --- core/src/main/scala/org/apache/spark/util/Utils.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f091e5ace1022..b4e8e157f0b3c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1357,7 +1357,10 @@ private[spark] object Utils extends Logging { return (service, port) } catch { case e: BindException => - if (!e.getMessage.contains("Address already in use") || offset >= maxRetries) { + if (!e.getMessage.contains("Address already in use")) { + throw e + } + if (offset >= maxRetries) { val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" val exception = new BindException(exceptionMessage) From e111d080b4a7c0103c30b3a6e29c058d0ac4c3d0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 18:46:11 -0700 Subject: [PATCH 20/39] Add names for UI services --- .../org/apache/spark/deploy/master/ui/MasterWebUI.scala | 2 +- .../org/apache/spark/deploy/worker/ui/WorkerWebUI.scala | 3 ++- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 2 +- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 5 +++-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 16aa0493370dd..c696843365d1a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.AkkaUtils */ private[spark] class MasterWebUI(val master: Master, requestedPort: Int) - extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging { + extends WebUI(master.securityMgr, requestedPort, master.conf, "MasterUI") with Logging { val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 0ad2edba2227f..5cd658323c098 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.worker.Worker +import org.apache.spark.deploy.worker.ui.WorkerWebUI._ import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.AkkaUtils @@ -34,7 +35,7 @@ class WorkerWebUI( val worker: Worker, val workDir: File, port: Option[Int] = None) - extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf) + extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, "WorkerUI") with Logging { val timeout = AkkaUtils.askTimeout(worker.conf) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 097a1b81e1dd1..6c788a37dc70b 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -36,7 +36,7 @@ private[spark] class SparkUI( val listenerBus: SparkListenerBus, var appName: String, val basePath: String = "") - extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath) + extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging { def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 856273e1d4e21..5f52f95088007 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -39,7 +39,8 @@ private[spark] abstract class WebUI( securityManager: SecurityManager, port: Int, conf: SparkConf, - basePath: String = "") + basePath: String = "", + name: String = "") extends Logging { protected val tabs = ArrayBuffer[WebUITab]() @@ -97,7 +98,7 @@ private[spark] abstract class WebUI( def bind() { assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf)) + serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => From 3f8e51bbb82669b43d7d52ece09ac957b35e994e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 18:46:29 -0700 Subject: [PATCH 21/39] Correct erroneous docs... --- docs/spark-standalone.md | 56 ++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index e80f2ec9861ed..29628747de410 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -311,7 +311,7 @@ configure those ports. Browser - Standalone Cluster Master + Master 8080 Web UI master.ui.port @@ -343,59 +343,59 @@ configure those ports. - Application - Standalone Cluster Master + Driver
Worker + Master 7077 - Submit job to cluster - spark.driver.port - Akka-based. Set to "0" to choose a port randomly + Submit job to cluster
Join cluster + SPARK_MASTER_PORT + Akka-based. Set to "0" to choose a port randomly. + Master Worker - Standalone Cluster Master - 7077 - Join cluster - spark.driver.port - Akka-based. Set to "0" to choose a port randomly + (random) + Schedule executors + SPARK_WORKER_PORT + Akka-based. Set to "0" to choose a port randomly. - Application - Worker + Executor
Master + Driver (random) - Join cluster - SPARK_WORKER_PORT (standalone cluster) - Akka-based + Connect to application
Notify Master and executor state changes + spark.driver.port + Akka-based. Set to "0" to choose a port randomly. - Worker - Application + Executor + Driver (random) File server for files and jars spark.fileserver.port Jetty-based - Worker - Application + Executor + Driver (random) HTTP Broadcast spark.broadcast.port - Jetty-based. Not used by TorrentBroadcast, which sends data through the block manager - instead + Jetty-based. Not used by TorrentBroadcast, which sends data through the block manager + instead. - Worker - Spark Shell + Executor + Driver (random) - Class file server (Spark Shell only) + Class file server spark.replClassServer.port - Jetty-based + Jetty-based. Only used in Spark shells. - Worker - Other Workers + Executor + Executor (random) Block Manager port spark.blockManager.port From 4d9e6f348cc408064173a91ecf9b509804eadf01 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 19:32:31 -0700 Subject: [PATCH 22/39] Fix super subtle bug We were passing the Master/Worker UI names as base paths for Jetty to use, which is obviously wrong in hindsight. --- .../scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala | 2 +- .../scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index c696843365d1a..d86ec1e03e45c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.AkkaUtils */ private[spark] class MasterWebUI(val master: Master, requestedPort: Int) - extends WebUI(master.securityMgr, requestedPort, master.conf, "MasterUI") with Logging { + extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging { val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 5cd658323c098..18de37364c31b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -35,7 +35,7 @@ class WorkerWebUI( val worker: Worker, val workDir: File, port: Option[Int] = None) - extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, "WorkerUI") + extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, name = "WorkerUI") with Logging { val timeout = AkkaUtils.askTimeout(worker.conf) From 8d836e6725120edaa36f4dd84ef9b4e88fc91f83 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 21:26:51 -0700 Subject: [PATCH 23/39] Also document SPARK_{MASTER/WORKER}_WEBUI_PORT The existing documentation uses {master/worker}.ui.port, which doesn't actually work because SparkConf doesn't pick up system properties that are not prefixed by spark (SPARK-2857). We should at least document a working alternative. --- docs/spark-standalone.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 29628747de410..cde11c5cf06cf 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -314,7 +314,15 @@ configure those ports. Master 8080 Web UI - master.ui.port + master.ui.port
SPARK_MASTER_WEBUI_PORT
+ Jetty-based + + + Browser + Worker + 8081 + Web UI + worker.ui.port
SPARK_WORKER_WEBUI_PORT
Jetty-based @@ -333,14 +341,6 @@ configure those ports. spark.history.ui.port Jetty-based - - Browser - Worker - 8081 - Web UI - worker.ui.port - Jetty-based - Driver
Worker From 6016e774e4034e0179f51042638bfd48b21b62e0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 21:59:14 -0700 Subject: [PATCH 24/39] Add spark.executor.port --- core/src/main/scala/org/apache/spark/SparkConf.scala | 5 ++++- .../spark/executor/CoarseGrainedExecutorBackend.scala | 5 +++-- docs/spark-standalone.md | 8 ++++++++ 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 38700847c80f4..cff12234831b6 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -309,6 +309,9 @@ private[spark] object SparkConf { * the scheduler, while the rest of the spark configs can be inherited from the driver later. */ def isExecutorStartupConf(name: String): Boolean = { - isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth") + isAkkaConf(name) || + name.startsWith("spark.akka") || + name.startsWith("spark.auth") || + name == "spark.executor.port" } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index af736de405397..1f46a0f176490 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -115,8 +115,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf + val port = executorConf.getInt("spark.executor.port", 0) val (fetcher, _) = AkkaUtils.createActorSystem( - "driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf)) + "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf)) val driver = fetcher.actorSelection(driverUrl) val timeout = AkkaUtils.askTimeout(executorConf) val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) @@ -126,7 +127,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Create a new ActorSystem using driver's Spark properties to run the backend. val driverConf = new SparkConf().setAll(props) val (actorSystem, boundPort) = AkkaUtils.createActorSystem( - "sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf)) + "sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf)) // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index cde11c5cf06cf..b6d2b2aa475d4 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -366,6 +366,14 @@ configure those ports. spark.driver.port Akka-based. Set to "0" to choose a port randomly. + + Driver + Executor + (random) + Schedule tasks + spark.executor.port + Akka-based. Set to "0" to choose a port randomly. + From 986835897b42c4b7ab0d21878d4d825aa530f59c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 22:32:09 -0700 Subject: [PATCH 25/39] Add a few miscellaneous ports --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 3 ++- .../scala/org/apache/spark/deploy/worker/DriverWrapper.scala | 3 ++- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 17c507af2652d..4b3c91fe7783a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -146,6 +146,7 @@ object Client { } val conf = new SparkConf() + val port = conf.getInt("spark.standalone.client.port", 0) // TODO: document this val driverArgs = new ClientArguments(args) if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { @@ -158,7 +159,7 @@ object Client { // TODO: See if we can initialize akka so return messages are sent back using the same TCP // flow. Else, this (sadly) requires the DriverClient be routable from the Master. val (actorSystem, _) = AkkaUtils.createActorSystem( - "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) + "driverClient", Utils.localHostName(), port, conf, new SecurityManager(conf)) actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 05e242e6df702..03da3feec3f0a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -30,8 +30,9 @@ object DriverWrapper { args.toList match { case workerUrl :: mainClass :: extraArgs => val conf = new SparkConf() + val watcherPort = conf.getInt("spark.worker.watcher.port", 0) // TODO: document this val (actorSystem, _) = AkkaUtils.createActorSystem("Driver", - Utils.localHostName(), 0, conf, new SecurityManager(conf)) + Utils.localHostName(), watcherPort, conf, new SecurityManager(conf)) actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher") // Delegate to supplied main class diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 1bb1b4aae91bb..6a8455e128535 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -84,7 +84,8 @@ private[spark] class Executor( // Initialize Spark environment (using system properties read above) private val env = { if (!isLocal) { - val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, + val port = conf.getInt("spark.executor.env.port", 0) // TODO: document this + val _env = SparkEnv.create(conf, executorId, slaveHostname, port, isDriver = false, isLocal = false) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) From 2551eb2ae9cf5b016a384892eeca8b201d131657 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 10:44:51 -0700 Subject: [PATCH 26/39] Remove spark.worker.watcher.port --- .../scala/org/apache/spark/deploy/worker/DriverWrapper.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 03da3feec3f0a..05e242e6df702 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -30,9 +30,8 @@ object DriverWrapper { args.toList match { case workerUrl :: mainClass :: extraArgs => val conf = new SparkConf() - val watcherPort = conf.getInt("spark.worker.watcher.port", 0) // TODO: document this val (actorSystem, _) = AkkaUtils.createActorSystem("Driver", - Utils.localHostName(), watcherPort, conf, new SecurityManager(conf)) + Utils.localHostName(), 0, conf, new SecurityManager(conf)) actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher") // Delegate to supplied main class From b565079b4515f1c0abb1176805348e9c80b8bb64 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 11:02:40 -0700 Subject: [PATCH 27/39] Add spark.ports.maxRetries --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 1 - .../main/scala/org/apache/spark/util/Utils.scala | 13 ++++++++++++- docs/configuration.md | 7 +++++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 051814f074c80..100033f7f8962 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -22,7 +22,6 @@ import java.net.Socket import scala.collection.JavaConversions._ import scala.collection.mutable -import scala.concurrent.Await import scala.util.Properties import akka.actor._ diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b4e8e157f0b3c..f4efca7ce825f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1331,6 +1331,17 @@ private[spark] object Utils extends Logging { .map { case (k, v) => s"-D$k=$v" } } + /** + * Default number of retries in binding to a port. + */ + val portMaxRetries: Int = { + // SparkEnv may be null during tests + Option(SparkEnv.get) + .flatMap(_.conf.getOption("spark.ports.maxRetries")) + .map(_.toInt) + .getOrElse(16) + } + /** * Attempt to start a service on the given port, or fail after a number of attempts. * Each subsequent attempt uses 1 + the port used in the previous attempt. @@ -1346,7 +1357,7 @@ private[spark] object Utils extends Logging { startPort: Int, startService: Int => (T, Int), serviceName: String = "", - maxRetries: Int = 3): (T, Int) = { + maxRetries: Int = portMaxRetries): (T, Int) = { val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port diff --git a/docs/configuration.md b/docs/configuration.md index 870343f1c0bd2..1cf4903835838 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -567,6 +567,13 @@ Apart from these, the following properties are also available, and may be useful Port for the driver to listen on. + + spark.port.maxRetries + 16 + + Maximum number of retries when binding to a port before giving up. + + spark.akka.frameSize 10 From de1b207abd0a02089036b1c19e2361ea55df9b7e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 11:31:47 -0700 Subject: [PATCH 28/39] Update docs to reflect new ports --- docs/configuration.md | 53 +++++++++++++++ docs/security.md | 139 ++++++++++++++++++++++++++++++++++++++- docs/spark-standalone.md | 114 +------------------------------- 3 files changed, 192 insertions(+), 114 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 1cf4903835838..21e766190a079 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -558,6 +558,7 @@ Apart from these, the following properties are also available, and may be useful (local hostname) Hostname or IP address for the driver to listen on. + This is used for communicating with the executors and the standalone Master. @@ -565,6 +566,58 @@ Apart from these, the following properties are also available, and may be useful (random) Port for the driver to listen on. + This is used for communicating with the executors and the standalone Master. + + + + spark.fileserver.port + (random) + + Port for the driver's HTTP file server to listen on. + + + + spark.broadcast.port + (random) + + Port for the driver's HTTP broadcast server to listen on. + This is not relevant for torrent broadcast. + + + + spark.replClassServer.port + (random) + + Port for the driver's HTTP class server to listen on. + This is only relevant for Spark shell. + + + + spark.blockManager.port + (random) + + Port for all block managers to listen on. These exist on both the driver and the executors. + + + + spark.executor.port + (random) + + Port for the executor to listen on. This is used for communicating with the driver. + + + + spark.executor.env.port + (random) + + Port used by the executor's actor system for various purposes. + + + + spark.standalone.cluster.port + (random) + + Port used by org.apache.spark.deploy.Client in standalone cluster deploy mode. diff --git a/docs/security.md b/docs/security.md index 90ba678033b19..b1755b3493beb 100644 --- a/docs/security.md +++ b/docs/security.md @@ -7,14 +7,147 @@ Spark currently supports authentication via a shared secret. Authentication can * For Spark on [YARN](running-on-yarn.html) deployments, configuring `spark.authenticate` to `true` will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. * For other types of Spark deployments, the Spark parameter `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. +* **IMPORTANT NOTE:** *The experimental Netty shuffle path (`spark.shuffle.use.netty`) is not secured, so do not use Netty for shuffles if running with authentication.* + +## Web UI The Spark UI can also be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.ui.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. +## Event Logging + If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access. -**IMPORTANT NOTE:** *The experimental Netty shuffle path (`spark.shuffle.use.netty`) is not secured, so do not use Netty for shuffles if running with authentication.* +## Configuring Ports for Network Security + +Spark makes heavy use of the network, and some environments have strict requirements for using tight +firewall settings. Below are the primary ports that Spark uses for its communication and how to +configure those ports. + +### Standalone mode only + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
FromToDefault PortPurposeConfiguration + SettingNotes
BrowserStandalone Master8080Web UImaster.ui.port
SPARK_MASTER_WEBUI_PORT
Jetty-based. Standalone mode only.
BrowserStandalone Worker8081Web UIworker.ui.port
SPARK_WORKER_WEBUI_PORT
Jetty-based. Standalone mode only.
Driver
Standalone Worker
Standalone Master7077Submit job to cluster
Join cluster
SPARK_MASTER_PORTAkka-based. Set to "0" to choose a port randomly. Standalone mode only.
Standalone MasterStandalone Worker(random)Schedule executorsSPARK_WORKER_PORTAkka-based. Set to "0" to choose a port randomly. Standalone mode only.
+ +### All cluster managers + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
FromToDefault PortPurposeConfiguration + SettingNotes
BrowserApplication4040Web UIspark.ui.portJetty-based
BrowserHistory Server18080Web UIspark.history.ui.portJetty-based
Executor
Standalone Master
Driver(random)Connect to application
Notify executor state changes
spark.driver.portAkka-based. Set to "0" to choose a port randomly.
DriverExecutor(random)Schedule tasksspark.executor.portAkka-based. Set to "0" to choose a port randomly.
DriverExecutor(random)Executor actor system portspark.executor.env.portAkka-based. Set to "0" to choose a port randomly.
ExecutorDriver(random)File server for files and jarsspark.fileserver.portJetty-based
ExecutorDriver(random)HTTP Broadcastspark.broadcast.portJetty-based. Not used by TorrentBroadcast, which sends data through the block manager + instead.
ExecutorDriver(random)Class file serverspark.replClassServer.portJetty-based. Only used in Spark shells.
Executor / DriverExecutor / Driver(random)Block Manager portspark.blockManager.portRaw socket via ServerSocketChannel
-See the [configuration page](configuration.html) for more details on the security configuration parameters. -See org.apache.spark.SecurityManager for implementation details about security. +See the [configuration page](configuration.html) for more details on the security configuration +parameters, and +org.apache.spark.SecurityManager for implementation details about security. diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index b6d2b2aa475d4..b25b02d3143bb 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -299,118 +299,10 @@ You can run Spark alongside your existing Hadoop cluster by just launching it as # Configuring Ports for Network Security -Spark makes heavy use of the network, and some environments have strict requirements for using tight -firewall settings. Below are the primary ports that Spark uses for its communication and how to -configure those ports. +Spark makes heavy use of the network, and some environments have strict requirements for using +tight firewall settings. For a complete list of ports to configure, see the [security page] +(security.html#configuring-ports-for-network-security). - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
FromToDefault PortPurposeConfiguration - SettingNotes
BrowserMaster8080Web UImaster.ui.port
SPARK_MASTER_WEBUI_PORT
Jetty-based
BrowserWorker8081Web UIworker.ui.port
SPARK_WORKER_WEBUI_PORT
Jetty-based
BrowserApplication4040Web UIspark.ui.portJetty-based
BrowserHistory Server18080Web UIspark.history.ui.portJetty-based
Driver
Worker
Master7077Submit job to cluster
Join cluster
SPARK_MASTER_PORTAkka-based. Set to "0" to choose a port randomly.
MasterWorker(random)Schedule executorsSPARK_WORKER_PORTAkka-based. Set to "0" to choose a port randomly.
Executor
Master
Driver(random)Connect to application
Notify Master and executor state changes
spark.driver.portAkka-based. Set to "0" to choose a port randomly.
DriverExecutor(random)Schedule tasksspark.executor.portAkka-based. Set to "0" to choose a port randomly.
ExecutorDriver(random)File server for files and jarsspark.fileserver.portJetty-based
ExecutorDriver(random)HTTP Broadcastspark.broadcast.portJetty-based. Not used by TorrentBroadcast, which sends data through the block manager - instead.
ExecutorDriver(random)Class file serverspark.replClassServer.portJetty-based. Only used in Spark shells.
ExecutorExecutor(random)Block Manager portspark.blockManager.portRaw socket via ServerSocketChannel
# High Availability From e837cde2fe6d9aa81522b328ce6e2b66d6fa1c3c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 11:33:54 -0700 Subject: [PATCH 29/39] Remove outdated TODOs --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 2 +- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 4b3c91fe7783a..0adb23d5eedc0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -146,7 +146,7 @@ object Client { } val conf = new SparkConf() - val port = conf.getInt("spark.standalone.client.port", 0) // TODO: document this + val port = conf.getInt("spark.standalone.client.port", 0) val driverArgs = new ClientArguments(args) if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 85b7fba2a9a22..ee4b8aa519603 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -84,7 +84,7 @@ private[spark] class Executor( // Initialize Spark environment (using system properties read above) private val env = { if (!isLocal) { - val port = conf.getInt("spark.executor.env.port", 0) // TODO: document this + val port = conf.getInt("spark.executor.env.port", 0) val _env = SparkEnv.create(conf, executorId, slaveHostname, port, isDriver = false, isLocal = false) SparkEnv.set(_env) From cb3be88ce9ca5ffed946ae5f98297ab7c19f0fe4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 11:49:33 -0700 Subject: [PATCH 30/39] Various doc fixes (broken link, format etc.) --- docs/security.md | 12 ++++++------ docs/spark-standalone.md | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/security.md b/docs/security.md index 7117d51f3086a..591ce4b7b7cbd 100644 --- a/docs/security.md +++ b/docs/security.md @@ -39,7 +39,7 @@ configure those ports. Standalone Master 8080 Web UI - spark.master.ui.port
SPARK_MASTER_WEBUI_PORT
+ spark.master.ui.port /
SPARK_MASTER_WEBUI_PORT
Jetty-based. Standalone mode only. @@ -47,14 +47,14 @@ configure those ports. Standalone Worker 8081 Web UI - spark.worker.ui.port
SPARK_WORKER_WEBUI_PORT
+ spark.worker.ui.port /
SPARK_WORKER_WEBUI_PORT
Jetty-based. Standalone mode only. - Driver
Standalone Worker + Driver /
Standalone Worker Standalone Master 7077 - Submit job to cluster
Join cluster + Submit job to cluster /
Join cluster SPARK_MASTER_PORT Akka-based. Set to "0" to choose a port randomly. Standalone mode only. @@ -92,10 +92,10 @@ configure those ports. Jetty-based - Executor
Standalone Master + Executor /
Standalone Master Driver (random) - Connect to application
Notify executor state changes + Connect to application /
Notify executor state changes spark.driver.port Akka-based. Set to "0" to choose a port randomly. diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index d4c41ba2b2a18..c791c81f8bfd0 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -300,14 +300,14 @@ You can run Spark alongside your existing Hadoop cluster by just launching it as # Configuring Ports for Network Security Spark makes heavy use of the network, and some environments have strict requirements for using -tight firewall settings. For a complete list of ports to configure, see the [security page] -(security.html#configuring-ports-for-network-security). +tight firewall settings. For a complete list of ports to configure, see the +[security page](security.html#configuring-ports-for-network-security). # High Availability By default, standalone scheduling clusters are resilient to Worker failures (insofar as Spark itself is resilient to losing work by moving it to other workers). However, the scheduler uses a Master to make scheduling decisions, and this (by default) creates a single point of failure: if the Master crashes, no new applications can be created. In order to circumvent this, we have two high availability schemes, detailed below. -## Standby Masters with ZooKeeper +# Standby Masters with ZooKeeper **Overview** @@ -347,7 +347,7 @@ There's an important distinction to be made between "registering with a Master" Due to this property, new Masters can be created at any time, and the only thing you need to worry about is that _new_ applications and Workers can find it to register with in case it becomes the leader. Once registered, you're taken care of. -## Single-Node Recovery with Local File System +# Single-Node Recovery with Local File System **Overview** From 1d2d5c61a80d1be19a4ef37693a2c4602062711e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 13:05:43 -0700 Subject: [PATCH 31/39] Fix ports for standalone cluster mode --- core/src/main/scala/org/apache/spark/SparkConf.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index eebb1f4102463..13f0bff7ee507 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -326,6 +326,11 @@ private[spark] object SparkConf { isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth") || - name == "spark.executor.port" + isSparkPortConf(name) } + + /** + * Return whether the given config is a Spark port config. + */ + def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port") } From 86461e2d5e6fbefebb3c652e0ea2cbd0823c7610 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 14:37:07 -0700 Subject: [PATCH 32/39] Remove spark.executor.env.port and spark.standalone.client.port Verified on a cluster that these are not actually necessary. This is because akka is the one that initiates the connection, and all responses use the same TCP connection. --- .../scala/org/apache/spark/deploy/Client.scala | 5 +---- .../scala/org/apache/spark/executor/Executor.scala | 3 +-- docs/configuration.md | 14 -------------- docs/security.md | 8 -------- 4 files changed, 2 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 0adb23d5eedc0..c07003784e8ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -146,7 +146,6 @@ object Client { } val conf = new SparkConf() - val port = conf.getInt("spark.standalone.client.port", 0) val driverArgs = new ClientArguments(args) if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { @@ -156,10 +155,8 @@ object Client { conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING")) Logger.getRootLogger.setLevel(driverArgs.logLevel) - // TODO: See if we can initialize akka so return messages are sent back using the same TCP - // flow. Else, this (sadly) requires the DriverClient be routable from the Master. val (actorSystem, _) = AkkaUtils.createActorSystem( - "driverClient", Utils.localHostName(), port, conf, new SecurityManager(conf)) + "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index ee4b8aa519603..c2b9c660ddaec 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -84,8 +84,7 @@ private[spark] class Executor( // Initialize Spark environment (using system properties read above) private val env = { if (!isLocal) { - val port = conf.getInt("spark.executor.env.port", 0) - val _env = SparkEnv.create(conf, executorId, slaveHostname, port, + val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, isDriver = false, isLocal = false) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) diff --git a/docs/configuration.md b/docs/configuration.md index 4d4b1866f7f3e..85a9bfc17580f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -606,20 +606,6 @@ Apart from these, the following properties are also available, and may be useful Port for the executor to listen on. This is used for communicating with the driver. - - spark.executor.env.port - (random) - - Port used by the executor's actor system for various purposes. - - - - spark.standalone.cluster.port - (random) - - Port used by org.apache.spark.deploy.Client in standalone cluster deploy mode. - - spark.port.maxRetries 16 diff --git a/docs/security.md b/docs/security.md index 591ce4b7b7cbd..ec0523184d665 100644 --- a/docs/security.md +++ b/docs/security.md @@ -107,14 +107,6 @@ configure those ports. spark.executor.port Akka-based. Set to "0" to choose a port randomly. - - Driver - Executor - (random) - Executor actor system port - spark.executor.env.port - Akka-based. Set to "0" to choose a port randomly. - Executor Driver From a2dd05c53182bf15d6f7c662e435acbd632a1f11 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 14:38:26 -0700 Subject: [PATCH 33/39] Patrick's comment nit --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 85a9bfc17580f..a182a07d46a85 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -589,7 +589,7 @@ Apart from these, the following properties are also available, and may be useful (random) Port for the driver's HTTP class server to listen on. - This is only relevant for Spark shell. + This is only relevant for the Spark shell. From d502e5f9fdf0045dad1cb4a6dd50c899b9de820f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 16:21:57 -0700 Subject: [PATCH 34/39] Handle port collisions when creating Akka systems This requires us to handle exceptions thrown more carefully, because akka throws its own exceptions that are not java.net.BindException. We workaround this by traversing the Exception causality tree to find a java.net.BindException with an "Address already in use" message. --- .../org/apache/spark/util/AkkaUtils.scala | 24 +++++++++++++++---- .../scala/org/apache/spark/util/Utils.scala | 20 ++++++++++++---- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index feafd654e9e71..d6afb73b74242 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.Await import scala.concurrent.duration.{Duration, FiniteDuration} -import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem} +import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import akka.pattern.ask import com.typesafe.config.ConfigFactory @@ -44,14 +44,28 @@ private[spark] object AkkaUtils extends Logging { * If indestructible is set to true, the Actor System will continue running in the event * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ - def createActorSystem(name: String, host: String, port: Int, - conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { + def createActorSystem( + name: String, + host: String, + port: Int, + conf: SparkConf, + securityManager: SecurityManager): (ActorSystem, Int) = { + val startService: Int => (ActorSystem, Int) = { actualPort => + doCreateActorSystem(name, host, actualPort, conf, securityManager) + } + Utils.startServiceOnPort(port, startService, name) + } + + private def doCreateActorSystem( + name: String, + host: String, + port: Int, + conf: SparkConf, + securityManager: SecurityManager): (ActorSystem, Int) = { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) - val akkaTimeout = conf.getInt("spark.akka.timeout", 100) - val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f4efca7ce825f..accdf2e4439a3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1367,10 +1367,7 @@ private[spark] object Utils extends Logging { logInfo(s"Successfully started service$serviceString on port $port.") return (service, port) } catch { - case e: BindException => - if (!e.getMessage.contains("Address already in use")) { - throw e - } + case e: Exception if isBindCollision(e) => if (offset >= maxRetries) { val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" @@ -1387,4 +1384,19 @@ private[spark] object Utils extends Logging { throw new SparkException(s"Failed to start service on port $startPort") } + /** + * Return whether the exception is caused by an address-port collision when binding. + */ + private def isBindCollision(exception: Throwable): Boolean = { + exception match { + case e: BindException => + if (e.getMessage != null && e.getMessage.contains("Address already in use")) { + return true + } + isBindCollision(e.getCause) + case e: Exception => isBindCollision(e.getCause) + case _ => false + } + } + } From 93d359f7a36d7e4bc89e5e3c7d771a2248c61888 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 16:23:35 -0700 Subject: [PATCH 35/39] Executors connect to wrong port when collision occurs This commit fixes this by setting "spark.driver.port" to the actual port the akka system bound to. Note that we already do this for port 0, where the original port is not the same as the bound port. However, we still face the same issue if we end up using port n + 1. --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 05c20a17830a3..9d4edeb6d96cf 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -150,10 +150,10 @@ object SparkEnv extends Logging { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf, securityManager = securityManager) - // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port), - // figure out which port number Akka actually bound to and set spark.driver.port to it. - if (isDriver && port == 0) { - conf.set("spark.driver.port", boundPort.toString) + // Figure out which port Akka actually bound to in case the original port is 0 or occupied. + // This is so that we tell the executors the correct port to connect to. + if (isDriver) { + conf.set("spark.driver.port", boundPort.toString) } // Create an instance of the class named by the given Java system property, or by From b97b02a070f1b6d8a076f24da6a50bef700a1ec7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 17:12:26 -0700 Subject: [PATCH 36/39] Minor fixes --- core/src/main/scala/org/apache/spark/HttpServer.scala | 2 +- .../scala/org/apache/spark/network/ConnectionManager.scala | 3 ++- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 3 +-- core/src/main/scala/org/apache/spark/util/Utils.scala | 7 +++---- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index ee018ac390f6e..912558d0cab7d 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -72,7 +72,7 @@ private[spark] class HttpServer( private def doStart(startPort: Int): (Server, Int) = { val server = new Server() val connector = new SocketConnector - connector.setMaxIdleTime(60*1000) + connector.setMaxIdleTime(60 * 1000) connector.setSoLingerTime(-1) connector.setPort(startPort) server.addConnector(connector) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index f837474b0c6ef..4c00225280cce 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -42,7 +42,8 @@ private[spark] class ConnectionManager( port: Int, conf: SparkConf, securityManager: SecurityManager, - name: String = "Connection manager") extends Logging { + name: String = "Connection manager") + extends Logging { class MessageStatus( val message: Message, diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 220b1bf234aaf..29e9cf947856f 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -199,8 +199,7 @@ private[spark] object JettyUtils extends Logging { } } - val (server, boundPort) = - Utils.startServiceOnPort[Server](port, connect, serverName, maxRetries = 10) + val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName) ServerInfo(server, boundPort, collection) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index accdf2e4439a3..af656837fbe8b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1344,14 +1344,13 @@ private[spark] object Utils extends Logging { /** * Attempt to start a service on the given port, or fail after a number of attempts. - * Each subsequent attempt uses 1 + the port used in the previous attempt. + * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). * * @param startPort The initial port to start the service on. * @param maxRetries Maximum number of retries to attempt. * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. - * @throws SparkException When unable to start the service after a given number of attempts */ def startServiceOnPort[T]( startPort: Int, @@ -1381,13 +1380,13 @@ private[spark] object Utils extends Logging { } } // Should never happen - throw new SparkException(s"Failed to start service on port $startPort") + throw new SparkException(s"Failed to start service$serviceString on port $startPort") } /** * Return whether the exception is caused by an address-port collision when binding. */ - private def isBindCollision(exception: Throwable): Boolean = { + def isBindCollision(exception: Throwable): Boolean = { exception match { case e: BindException => if (e.getMessage != null && e.getMessage.contains("Address already in use")) { From 523c30e3dc25a9a941749f9c97f9d157564fb49b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 17:12:31 -0700 Subject: [PATCH 37/39] Add test for isBindCollision --- .../org/apache/spark/util/UtilsSuite.scala | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 1ee936bc78f49..70d423ba8a04d 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import scala.util.Random import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream} -import java.net.URI +import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} import com.google.common.base.Charsets @@ -265,4 +265,36 @@ class UtilsSuite extends FunSuite { Array("hdfs:/a.jar", "s3:/another.jar")) } + test("isBindCollision") { + // Negatives + assert(!Utils.isBindCollision(null)) + assert(!Utils.isBindCollision(new Exception)) + assert(!Utils.isBindCollision(new Exception(new Exception))) + assert(!Utils.isBindCollision(new Exception(new BindException))) + assert(!Utils.isBindCollision(new Exception(new BindException("Random message")))) + + // Positives + val be = new BindException("Address already in use") + val be1 = new Exception(new BindException("Address already in use")) + val be2 = new Exception(new Exception(new BindException("Address already in use"))) + assert(Utils.isBindCollision(be)) + assert(Utils.isBindCollision(be1)) + assert(Utils.isBindCollision(be2)) + + // Actual bind exception + var server1: ServerSocket = null + var server2: ServerSocket = null + try { + server1 = new java.net.ServerSocket(0) + server2 = new java.net.ServerSocket(server1.getLocalPort) + } catch { + case e: Exception => + assert(e.isInstanceOf[java.net.BindException]) + assert(Utils.isBindCollision(e)) + } finally { + Option(server1).foreach(_.close()) + Option(server2).foreach(_.close()) + } + } + } From 7da0493fc63e58ea95016f86366cdeb8a0ffd53b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 20:20:14 -0700 Subject: [PATCH 38/39] Fix tests It seems that the tests assume we can keep opening UI ports. Our existing default for spark.ports.maxRetries does not tolerate this, however, so the test process throws an exception and returns exit code 1. --- .../main/scala/org/apache/spark/util/Utils.scala | 14 +++++++++----- project/SparkBuild.scala | 1 + 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index af656837fbe8b..c60be4f8a11d2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1335,11 +1335,15 @@ private[spark] object Utils extends Logging { * Default number of retries in binding to a port. */ val portMaxRetries: Int = { - // SparkEnv may be null during tests - Option(SparkEnv.get) - .flatMap(_.conf.getOption("spark.ports.maxRetries")) - .map(_.toInt) - .getOrElse(16) + if (sys.props.contains("spark.testing")) { + // Set a higher number of retries for tests... + sys.props.get("spark.ports.maxRetries").map(_.toInt).getOrElse(100) + } else { + Option(SparkEnv.get) + .flatMap(_.conf.getOption("spark.ports.maxRetries")) + .map(_.toInt) + .getOrElse(16) + } } /** diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index aac621fe53938..03eea7c8b15a2 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -330,6 +330,7 @@ object TestSettings { fork := true, javaOptions in Test += "-Dspark.test.home=" + sparkHome, javaOptions in Test += "-Dspark.testing=1", + javaOptions in Test += "-Dspark.ports.maxRetries=100", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark") .map { case (k,v) => s"-D$k=$v" }.toSeq, From 8a6b820f328cc79dc1f730ed1ecf3fb6215458ac Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 5 Aug 2014 21:41:27 -0700 Subject: [PATCH 39/39] Use a random UI port during tests so as to avoid all the conflicts there are. --- project/SparkBuild.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 03eea7c8b15a2..40b588512ff08 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -331,6 +331,7 @@ object TestSettings { javaOptions in Test += "-Dspark.test.home=" + sparkHome, javaOptions in Test += "-Dspark.testing=1", javaOptions in Test += "-Dspark.ports.maxRetries=100", + javaOptions in Test += "-Dspark.ui.port=0", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark") .map { case (k,v) => s"-D$k=$v" }.toSeq,