Skip to content

Commit

Permalink
Add fallover increment logic for HttpServer
Browse files Browse the repository at this point in the history
  • Loading branch information
ash211 committed Jul 25, 2014
1 parent c5a0568 commit cad16da
Showing 1 changed file with 52 additions and 29 deletions.
81 changes: 52 additions & 29 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,40 +47,63 @@ private[spark] class HttpServer(resourceBase: File,
private var server: Server = null
private var port: Int = localPort

private def startOnPort(startPort: Int) {
val server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
val actualPort = server.getConnectors()(0).getLocalPort()

return (server, actualPort)
}

private def startWithIncrements(startPort: Int, maxTries: Int) {
for( tryPort <- startPort until (startPort+maxTries)) {
try {
val (server, actualPort) = startOnPort(startPort)
return (server, actualPort)
} catch {
case e: java.net.BindException => {
if (!e.getMessage.contains("Address already in use")) {
throw e
}
logInfo("Could not bind on port: " + (tryPort))
}
case e: Exception => throw e
}
}
}

def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(localPort)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
port = server.getConnectors()(0).getLocalPort()
(server, port) = startWithIncrements(localPort, 3)
}
}

Expand Down

0 comments on commit cad16da

Please sign in to comment.