-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-5006][Deploy]spark.port.maxRetries doesn't work #3841
Conversation
Test build #24890 has started for PR 3841 at commit
|
Test build #24890 has finished for PR 3841 at commit
|
Test FAILed. |
Test build #24891 has started for PR 3841 at commit
|
Test build #24891 has finished for PR 3841 at commit
|
Test PASSed. |
@@ -176,6 +176,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |||
logInfo(s"Running Spark version $SPARK_VERSION") | |||
|
|||
private[spark] val conf = config.clone() | |||
val portRetriesConf = conf.getOption("spark.port.maxRetries") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use conf.getOption(...).foreach { portRetriesConf => [...] }
but I'm not sure that it's a huge win.
I'm a bit confused about this chance, since it seems like changing the code to read that value from system properties instead of SparkConf breaks our ability to configure it via SparkConf. Can you add a failing unit test which demonstrates the problem / bug that this patch addresses? If this issue has to do with initialization ordering, I'd like to see if we can come up with a cleaner approach which doesn't involve things like unexplained |
I set
We could see the third application retried 3 times before it giving up. So the spark.port.maxRetries didn't work here.
|
Test build #24928 has started for PR 3841 at commit
|
After this patch, the logs of running 3 application with
And I oberved other port on driver and executors, they acted as expected too. |
Test build #24928 has finished for PR 3841 at commit
|
Test FAILed. |
retest this please. |
Test build #24943 has started for PR 3841 at commit
|
Test build #24943 has finished for PR 3841 at commit
|
Test PASSed. |
@andrewor14 Could you take a look? |
.flatMap(_.conf.getOption("spark.port.maxRetries")) | ||
.map(_.toInt) | ||
.getOrElse(16) | ||
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(16) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried that changing this to read from System properties rather than the SparkEnv will break standalone mode here. Have you confirmed that spark.port.maxRetries
in standalone mode still works with this change?
In general we should prefer using SparkEnv over System properties because they have all kind of gotchas. There's a reason we created SparkEnv in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we should read from the conf, not from sys.props
directly
@WangTaoTheTonic I understand the problem you're observing and think it's related specifically to YARN. Without being super familiar with YARN, I think the approach this patch should take is to send the config option via a system property with a -D flag to the executor, and immediately put that System property into a stub SparkEnv object. When the real SparkEnv object comes over the wire, the two can be merged (or maybe the first can be dropped). This way we don't have to change to reading from System properties in other places, like |
@@ -372,5 +372,5 @@ private[spark] object SparkConf { | |||
/** | |||
* Return whether the given config is a Spark port config. | |||
*/ | |||
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port") | |||
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.contains(".port") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are misunderstanding what this method does. It looks for spark.*.port
intentionally and should not match spark.port.maxRetries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should send spark.port.maxRetries
to the executor before it is launched then the config will be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that, but this currently matches something like spark.portable.mushroom
which has nothing to do with Spark ports. Maybe instead you want to do something like:
name.matches("spark\\..*\\.port") | name.startsWith("spark.port.")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I changed but used a more obvious way.
So @WangTaoTheTonic @JoshRosen does changing the |
Test build #25414 has started for PR 3841 at commit
|
Test build #25410 has finished for PR 3841 at commit
|
Test PASSed. |
Test build #25414 has finished for PR 3841 at commit
|
Test PASSed. |
@andrewor14 Updated and tested simply, it works. |
@@ -57,7 +57,7 @@ private[spark] class HttpServer( | |||
} else { | |||
logInfo("Starting HTTP Server") | |||
val (actualServer, actualPort) = | |||
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName) | |||
Utils.startServiceOnPort[Server](requestedPort, doStart, new SparkConf(), serverName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should pass this a conf that already exists instead of creating a new one. This requires changing the constructors of HttpServer
and HttpFileServer
. I believe there is already a conf everywhere else.
@WangTaoTheTonic this looks much better than before. My new comments are fairly minor. |
Test build #25453 has started for PR 3841 at commit
|
Test build #25453 has finished for PR 3841 at commit
|
Test PASSed. |
*/ | ||
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port") | ||
def isSparkPortConf(name: String): Boolean = { | ||
(name.startsWith("spark.") && name.endsWith(".port")) | name.startsWith("spark.port.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be ||
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix this myself.
Ok LGTM I will fix the last batch of comments myself when I merge this into master. Thanks. |
@andrewor14 Should this be backported to 1.2.1? |
https://issues.apache.org/jira/browse/SPARK-5006 I think the issue is produced in #1777. Not digging mesos's backend yet. Maybe should add same logic either. Author: WangTaoTheTonic <[email protected]> Author: WangTao <[email protected]> Closes #3841 from WangTaoTheTonic/SPARK-5006 and squashes the following commits: 8cdf96d [WangTao] indent thing 2d86d65 [WangTaoTheTonic] fix line length 7cdfd98 [WangTaoTheTonic] fit for new HttpServer constructor 61a370d [WangTaoTheTonic] some minor fixes bc6e1ec [WangTaoTheTonic] rebase 67bcb46 [WangTaoTheTonic] put conf at 3rd position, modify suite class, add comments f450cd1 [WangTaoTheTonic] startServiceOnPort will use a SparkConf arg 29b751b [WangTaoTheTonic] rebase as ExecutorRunnableUtil changed to ExecutorRunnable 396c226 [WangTaoTheTonic] make the grammar more like scala 191face [WangTaoTheTonic] invalid value name 62ec336 [WangTaoTheTonic] spark.port.maxRetries doesn't work Conflicts: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
Ok I also put it in branch-1.2 |
https://issues.apache.org/jira/browse/SPARK-5006
I think the issue is produced in #1777.
Not digging mesos's backend yet. Maybe should add same logic either.