Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5006][Deploy]spark.port.maxRetries doesn't work #3841

Closed
wants to merge 11 commits into from
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.google.common.io.Files
import org.apache.spark.util.Utils

private[spark] class HttpFileServer(
conf: SparkConf,
securityManager: SecurityManager,
requestedPort: Int = 0)
extends Logging {
Expand All @@ -41,7 +42,7 @@ private[spark] class HttpFileServer(
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* around a Jetty server.
*/
private[spark] class HttpServer(
conf: SparkConf,
resourceBase: File,
securityManager: SecurityManager,
requestedPort: Int = 0,
Expand All @@ -57,7 +58,7 @@ private[spark] class HttpServer(
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
server = actualServer
port = actualPort
}
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ private[spark] object SparkConf {
}

/**
* Return whether the given config is a Spark port config.
* Return true if the given config matches either `spark.*.port` or `spark.port.*`.
*/
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
def isSparkPortConf(name: String): Boolean = {
(name.startsWith("spark.") && name.endsWith(".port")) | name.startsWith("spark.port.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be ||!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix this myself.

}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ object SparkEnv extends Logging {
val httpFileServer =
if (isDriver) {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(securityManager, fileServerPort)
val server = new HttpFileServer(conf, securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ private[broadcast] object HttpBroadcast extends Logging {
private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
server =
new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private[nio] class ConnectionManager(
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, serverChannel.socket.getLocalPort)
}
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging {
}
}

val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName)
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName)
ServerInfo(server, boundPort, collection)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, name)
Utils.startServiceOnPort(port, startService, conf, name)
}

private def doCreateActorSystem(
Expand Down
21 changes: 10 additions & 11 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1692,15 +1692,13 @@ private[spark] object Utils extends Logging {
/**
* Default maximum number of retries when binding to a port before giving up.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not "Default" anymore

*/
val portMaxRetries: Int = {
if (sys.props.contains("spark.testing")) {
def portMaxRetries(conf: SparkConf): Int = {
val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
if (conf.contains("spark.testing")) {
// Set a higher number of retries for tests...
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
maxRetries.getOrElse(100)
} else {
Option(SparkEnv.get)
.flatMap(_.conf.getOption("spark.port.maxRetries"))
.map(_.toInt)
.getOrElse(16)
maxRetries.getOrElse(16)
}
}

Expand All @@ -1709,17 +1707,18 @@ private[spark] object Utils extends Logging {
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
*
* @param startPort The initial port to start the service on.
* @param maxRetries Maximum number of retries to attempt.
* A value of 3 means attempting ports n, n+1, n+2, and n+3, for example.
* @param startService Function to start service on a given port.
* This is expected to throw java.net.BindException on port collision.
* @param conf A SparkConf used to get the maximum number of retries when binding to a port.
* @param serviceName Name of the service.
*/
def startServiceOnPort[T](
startPort: Int,
startService: Int => (T, Int),
serviceName: String = "",
maxRetries: Int = portMaxRetries): (T, Int) = {
conf: SparkConf,
serviceName: String = ""): (T, Int) = {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
})._2
}, conf)._2
}

/** Setup and start the streaming context */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
Expand Down Expand Up @@ -106,7 +107,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
})._2
}, new SparkConf())._2
}

def publishData(data: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ import org.apache.spark.util.Utils
val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles
/** Jetty server that will serve our classes to worker nodes */
val classServerPort = conf.getInt("spark.replClassServer.port", 0)
val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
private var currentSettings: Settings = initialSettings
var printResults = true // whether to print result lines
var totalSilence = false // whether to print anything
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object Main extends Logging {
val s = new Settings()
s.processArguments(List("-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-Yrepl-sync"), true)
val classServer = new HttpServer(outputDir, new SecurityManager(conf))
val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf))
var sparkContext: SparkContext = _
var interp = new SparkILoop // this is a public var because tests reset it.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,9 @@ class ExecutorRunnable(
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }

sparkConf.getAkkaConf.
foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
sparkConf.getAll
.filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }
.foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }

// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
Expand Down