-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-5006][Deploy]spark.port.maxRetries doesn't work #3841
Changes from 7 commits
62ec336
191face
396c226
29b751b
f450cd1
67bcb46
bc6e1ec
61a370d
7cdfd98
2d86d65
8cdf96d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -372,5 +372,5 @@ private[spark] object SparkConf { | |
/** | ||
* Return whether the given config is a Spark port config. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then you'll need to update the docs here to say something like:
|
||
*/ | ||
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port") | ||
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.contains(".port") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you are misunderstanding what this method does. It looks for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should send There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand that, but this currently matches something like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I changed but used a more obvious way. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1692,15 +1692,12 @@ private[spark] object Utils extends Logging { | |
/** | ||
* Default maximum number of retries when binding to a port before giving up. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not "Default" anymore |
||
*/ | ||
val portMaxRetries: Int = { | ||
def portMaxRetries(conf: SparkConf): Int = { | ||
if (sys.props.contains("spark.testing")) { | ||
// Set a higher number of retries for tests... | ||
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now that this takes in a conf, is it possible to replace these |
||
} else { | ||
Option(SparkEnv.get) | ||
.flatMap(_.conf.getOption("spark.port.maxRetries")) | ||
.map(_.toInt) | ||
.getOrElse(16) | ||
conf.getOption("spark.port.maxRetries").map(_.toInt).getOrElse(16) | ||
} | ||
} | ||
|
||
|
@@ -1709,17 +1706,20 @@ private[spark] object Utils extends Logging { | |
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). | ||
* | ||
* @param startPort The initial port to start the service on. | ||
* @param maxRetries Maximum number of retries to attempt. | ||
* A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. | ||
* @param startService Function to start service on a given port. | ||
* This is expected to throw java.net.BindException on port collision. | ||
* @param conf Used to get maximum number of retries. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
* @param serviceName Name of the service. | ||
*/ | ||
def startServiceOnPort[T]( | ||
startPort: Int, | ||
startService: Int => (T, Int), | ||
serviceName: String = "", | ||
maxRetries: Int = portMaxRetries): (T, Int) = { | ||
conf: SparkConf, | ||
serviceName: String = "" | ||
): (T, Int) = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move this up 1 line |
||
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" | ||
val maxRetries = portMaxRetries(conf) | ||
logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo: need extra space in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's better to not log this. It'll get fairly noisy if there are many retries. |
||
for (offset <- 0 to maxRetries) { | ||
// Do not increment port if startPort is 0, which is treated as a special port | ||
val tryPort = if (startPort == 0) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,8 +148,9 @@ class ExecutorRunnable( | |
// registers with the Scheduler and transfers the spark configs. Since the Executor backend | ||
// uses Akka to connect to the scheduler, the akka settings are needed as well as the | ||
// authentication settings. | ||
sparkConf.getAll. | ||
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. | ||
sparkConf.getAll.filter { case (k, v) => | ||
k.startsWith("spark.auth") || k.startsWith("spark.akka") || k.equals("spark.port.maxRetries") | ||
}. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note your code, but shouldn't this just be the following for code reuse?
|
||
foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } | ||
|
||
sparkConf.getAkkaConf. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should pass this a conf that already exists instead of creating a new one. This requires changing the constructors of
HttpServer
andHttpFileServer
. I believe there is already a conf everywhere else.