Skip to content

Commit

Permalink
spark.port.maxRetries doesn't work
Browse files Browse the repository at this point in the history
  • Loading branch information
WangTaoTheTonic committed Dec 30, 2014
1 parent 040d6f2 commit 62ec336
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 7 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,5 +372,5 @@ private[spark] object SparkConf {
/**
* Return whether the given config is a Spark port config.
*/
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.contains(".port")
}
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo(s"Running Spark version $SPARK_VERSION")

private[spark] val conf = config.clone()
val portRetriesConf = conf.getOption("spark.port.maxRetries")
if (portRetriesConf.isDefined) {
System.setProperty("spark.port.maxRetries", portRetriesConf.get)
}
conf.validateSettings()

/**
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1691,15 +1691,12 @@ private[spark] object Utils extends Logging {
/**
* Default maximum number of retries when binding to a port before giving up.
*/
val portMaxRetries: Int = {
lazy val portMaxRetries: Int = {
if (sys.props.contains("spark.testing")) {
// Set a higher number of retries for tests...
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
} else {
Option(SparkEnv.get)
.flatMap(_.conf.getOption("spark.port.maxRetries"))
.map(_.toInt)
.getOrElse(16)
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(16)
}
}

Expand All @@ -1719,6 +1716,7 @@ private[spark] object Utils extends Logging {
serviceName: String = "",
maxRetries: Int = portMaxRetries): (T, Int) = {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
logInfo(s"Starting service$serviceString on port $port with maximum $maxRetries retries. ")
for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ trait ExecutorRunnableUtil extends Logging {
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
filter { case (k, v) =>
k.startsWith("spark.auth") || k.startsWith("spark.akka") || k.equals("spark.port.maxRetries")
}.
foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }

sparkConf.getAkkaConf.
Expand Down

0 comments on commit 62ec336

Please sign in to comment.