Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5006][Deploy]spark.port.maxRetries doesn't work #3841

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[spark] class HttpServer(
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
Utils.startServiceOnPort[Server](requestedPort, doStart, new SparkConf(), serverName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should pass this a conf that already exists instead of creating a new one. This requires changing the constructors of HttpServer and HttpFileServer. I believe there is already a conf everywhere else.

server = actualServer
port = actualPort
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,5 +372,5 @@ private[spark] object SparkConf {
/**
* Return whether the given config is a Spark port config.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you'll need to update the docs here to say something like:

Return true if the given config matches either `spark.*.port` or `spark.port.*`.

*/
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.contains(".port")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are misunderstanding what this method does. It looks for spark.*.port intentionally and should not match spark.port.maxRetries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should send spark.port.maxRetries to the executor before it is launched then the config will be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that, but this currently matches something like spark.portable.mushroom which has nothing to do with Spark ports. Maybe instead you want to do something like:

name.matches("spark\\..*\\.port") | name.startsWith("spark.port.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I changed but used a more obvious way.

}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private[nio] class ConnectionManager(
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, serverChannel.socket.getLocalPort)
}
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging {
}
}

val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName)
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName)
ServerInfo(server, boundPort, collection)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, name)
Utils.startServiceOnPort(port, startService, conf, name)
}

private def doCreateActorSystem(
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1692,15 +1692,12 @@ private[spark] object Utils extends Logging {
/**
* Default maximum number of retries when binding to a port before giving up.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not "Default" anymore

*/
val portMaxRetries: Int = {
def portMaxRetries(conf: SparkConf): Int = {
if (sys.props.contains("spark.testing")) {
// Set a higher number of retries for tests...
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that this takes in a conf, is it possible to replace these sys.props with conf? Will the tests still pass (I think they should)?

} else {
Option(SparkEnv.get)
.flatMap(_.conf.getOption("spark.port.maxRetries"))
.map(_.toInt)
.getOrElse(16)
conf.getOption("spark.port.maxRetries").map(_.toInt).getOrElse(16)
}
}

Expand All @@ -1709,17 +1706,20 @@ private[spark] object Utils extends Logging {
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
*
* @param startPort The initial port to start the service on.
* @param maxRetries Maximum number of retries to attempt.
* A value of 3 means attempting ports n, n+1, n+2, and n+3, for example.
* @param startService Function to start service on a given port.
* This is expected to throw java.net.BindException on port collision.
* @param conf Used to get maximum number of retries.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A SparkConf used to get the maximum number of retries when binding to a port

* @param serviceName Name of the service.
*/
def startServiceOnPort[T](
startPort: Int,
startService: Int => (T, Int),
serviceName: String = "",
maxRetries: Int = portMaxRetries): (T, Int) = {
conf: SparkConf,
serviceName: String = ""
): (T, Int) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this up 1 line

val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: need extra space in service$serviceString).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serviceString is already prefixed previously.

val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to not log this. It'll get fairly noisy if there are many retries.

for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
})._2
}, conf)._2
}

/** Setup and start the streaming context */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
Expand Down Expand Up @@ -106,7 +107,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
})._2
}, new SparkConf())._2
}

def publishData(data: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ class ExecutorRunnable(
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
sparkConf.getAll.filter { case (k, v) =>
k.startsWith("spark.auth") || k.startsWith("spark.akka") || k.equals("spark.port.maxRetries")
}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note your code, but shouldn't this just be the following for code reuse?

sparkConf.getAll
  .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }
  .foreach { ... }

foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }

sparkConf.getAkkaConf.
Expand Down