diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 491d8307c8c0b..929b6380f51a9 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.File +import org.apache.spark.network.PortManager import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.security.authentication.DigestAuthenticator import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler} @@ -47,7 +48,7 @@ private[spark] class HttpServer(resourceBase: File, private var server: Server = null private var port: Int = localPort - private def startOnPort(startPort: Int): Tuple2[Server,Int] = { + private def startOnPort(startPort: Int): Server = { val server = new Server() val connector = new SocketConnector connector.setMaxIdleTime(60*1000) @@ -78,26 +79,7 @@ private[spark] class HttpServer(resourceBase: File, server.start() val actualPort = server.getConnectors()(0).getLocalPort() - return (server, actualPort) - } - - private def startWithIncrements(startPort: Int, maxRetries: Int): Tuple2[Server,Int] = { - for( offset <- 0 to maxRetries) { - try { - val (server, actualPort) = startOnPort(startPort + offset) - return (server, actualPort) - } catch { - case e: java.net.BindException => { - if (!e.getMessage.contains("Address already in use") || - offset == (maxRetries-1)) { - throw e - } - logInfo("Could not bind on port: " + (startPort + offset)) - } - case e: Exception => throw e - } - } - return (null, -1) + server } def start() { @@ -105,7 +87,7 @@ private[spark] class HttpServer(resourceBase: File, throw new ServerStateException("Server is already started") } else { logInfo("Starting HTTP Server") - val (actualServer, actualPort) = startWithIncrements(localPort, 3) + val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort) server = actualServer port = actualPort } diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index d3e575cb5ef29..e3ff67829b785 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -102,24 +102,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, serverChannel.socket.setReuseAddress(true) serverChannel.socket.setReceiveBufferSize(256 * 1024) - def bindWithIncrement(port: Int, maxTries: Int = 3) { - for( offset <- 0 until maxTries ) { - try { - serverChannel.socket.bind(new InetSocketAddress(port + offset)) - return - } catch { - case e: java.net.BindException => { - if(!e.getMessage.contains("Address already in use") || - offset == maxTries) { - throw e - } - logInfo("Could not bind on port: " + (port + offset)) - } - case e: Exception => throw e - } - } + private def startService(port: Int) = { + serverChannel.socket.bind(new InetSocketAddress(port)) + serverChannel } - bindWithIncrement(port, 3) + PortManager.startWithIncrements(port, 3, startService) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/network/PortManager.scala b/core/src/main/scala/org/apache/spark/network/PortManager.scala new file mode 100644 index 0000000000000..82cafa637f3dc --- /dev/null +++ b/core/src/main/scala/org/apache/spark/network/PortManager.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network + +import java.net.InetSocketAddress + +import org.apache.spark.{Logging, SparkException} +import org.eclipse.jetty.server.Server + +private[spark] object PortManager extends Logging +{ + + /** + * Start service on given port, or attempt to fall back to the n+1 port for a certain number of + * retries + * + * @param startPort + * @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4 + * total attempts, on ports n, n+1, n+2, and n+3 + * @param startService Function to start service on a given port. Expected to throw a java.net + * .BindException if the port is already in use + * @tparam T + * @throws SparkException When unable to start service in the given number of attempts + * @return + */ + def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => T): + (T, Int) = { + for( offset <- 0 to maxRetries) { + val tryPort = startPort + offset + try { + val service: T = startService(tryPort) + return (service, tryPort) + } catch { + case e: java.net.BindException => { + if (!e.getMessage.contains("Address already in use") || + offset == (maxRetries-1)) { + throw e + } + logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort+1)) + } + case e: Exception => throw e + } + } + throw new SparkException(s"Couldn't start service on port $startPort") + } +}