Skip to content

Commit

Permalink
startServiceOnPort will use a SparkConf arg
Browse files Browse the repository at this point in the history
  • Loading branch information
WangTaoTheTonic committed Jan 12, 2015
1 parent 29b751b commit f450cd1
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[spark] class HttpServer(
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName, new SparkConf())
server = actualServer
port = actualPort
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo(s"Running Spark version $SPARK_VERSION")

private[spark] val conf = config.clone()
conf.getOption("spark.port.maxRetries")
.foreach(portRetriesConf => System.setProperty("spark.port.maxRetries", portRetriesConf))
conf.validateSettings()

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private[nio] class ConnectionManager(
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, serverChannel.socket.getLocalPort)
}
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name, conf)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging {
}
}

val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName)
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName, conf)
ServerInfo(server, boundPort, collection)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, name)
Utils.startServiceOnPort(port, startService, name, conf)
}

private def doCreateActorSystem(
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1692,12 +1692,12 @@ private[spark] object Utils extends Logging {
/**
* Default maximum number of retries when binding to a port before giving up.
*/
lazy val portMaxRetries: Int = {
def portMaxRetries(conf: SparkConf): Int = {
if (sys.props.contains("spark.testing")) {
// Set a higher number of retries for tests...
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
} else {
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(16)
conf.getOption("spark.port.maxRetries").map(_.toInt).getOrElse(16)
}
}

Expand All @@ -1715,8 +1715,9 @@ private[spark] object Utils extends Logging {
startPort: Int,
startService: Int => (T, Int),
serviceName: String = "",
maxRetries: Int = portMaxRetries): (T, Int) = {
conf: SparkConf): (T, Int) = {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ")
for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
Expand Down

0 comments on commit f450cd1

Please sign in to comment.