diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index c14764f773982..728c38f1a86b3 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -372,5 +372,5 @@ private[spark] object SparkConf { /** * Return whether the given config is a Spark port config. */ - def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port") + def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.contains(".port") } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 57bc3d4e4ae36..1b6411928db64 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -176,6 +176,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo(s"Running Spark version $SPARK_VERSION") private[spark] val conf = config.clone() + val portRetriesConf = conf.getOption("spark.port.maxRetries") + if (portRetriesConf.isDefined) { + System.setProperty("spark.port.maxRetries", portRetriesConf.get) + } conf.validateSettings() /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0d771baaa6abc..f323870463b1c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1691,15 +1691,12 @@ private[spark] object Utils extends Logging { /** * Default maximum number of retries when binding to a port before giving up. */ - val portMaxRetries: Int = { + lazy val portMaxRetries: Int = { if (sys.props.contains("spark.testing")) { // Set a higher number of retries for tests... sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100) } else { - Option(SparkEnv.get) - .flatMap(_.conf.getOption("spark.port.maxRetries")) - .map(_.toInt) - .getOrElse(16) + sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(16) } } @@ -1719,6 +1716,7 @@ private[spark] object Utils extends Logging { serviceName: String = "", maxRetries: Int = portMaxRetries): (T, Int) = { val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + logInfo(s"Starting service$serviceString on port $port with maximum $maxRetries retries. ") for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index 22d73ecf6d010..205c1e538fe83 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -76,7 +76,9 @@ trait ExecutorRunnableUtil extends Logging { // uses Akka to connect to the scheduler, the akka settings are needed as well as the // authentication settings. sparkConf.getAll. - filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. + filter { case (k, v) => + k.startsWith("spark.auth") || k.startsWith("spark.akka") || k.equals("spark.port.maxRetries") + }. foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } sparkConf.getAkkaConf.