Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2635] Fix race condition at SchedulerBackend.isReady in standalone mode #1525

Closed
wants to merge 5 commits into from

Conversation

li-zhihui
Copy link
Contributor

In SPARK-1946(PR #900), configuration spark.scheduler.minRegisteredExecutorsRatio was introduced. However, in standalone mode, there is a race condition where isReady() can return true because totalExpectedExecutors has not been correctly set.

Because expected executors is uncertain in standalone mode, the PR try to use CPU cores(--total-executor-cores) as expected resources to judge whether SchedulerBackend is ready.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@li-zhihui
Copy link
Contributor Author

@kayousterhout @tgravescs

if (minRegisteredRatio > 1) minRegisteredRatio = 1
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
// Whatever minRegisteredRatio is arrived, submit tasks after the time(milliseconds).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Submit tasks time(milliseconds) after minRegisteredRatio is reached

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @markhamstra , but I think the code means that submit tasks time if minRegisteredRatio is not reached.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see -- sorry. Looks like this is what we want? // Submit tasks after maxRegisteredWaitingTime milliseconds if minRegisteredRatio has not yet been reached

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good, thanks @markhamstra

@tgravescs
Copy link
Contributor

can you please also file a jira for this

@tgravescs
Copy link
Contributor

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA tests have started for PR 1525. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17006/consoleFull

@li-zhihui li-zhihui changed the title Fix race condition at SchedulerBackend.isReady in standalone mode [SPARK-2635] Fix race condition at SchedulerBackend.isReady in standalone mode Jul 23, 2014
@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA results for PR 1525:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17006/consoleFull

// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)
if (minRegisteredRatio > 1) minRegisteredRatio = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like minRegisteredRatio is only needed within this class and doesn't need to be a var:

private val minRegisteredRatio = math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...actually, it doesn't look like it's used at all anymore except in a log message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, was doing something dumb. Leave it a var but clean up the initialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@li-zhihui
Copy link
Contributor Author

I add a new commit, @tgravescs @markhamstra

logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
"maxRegisteredResourcesWaitingTime(ms): " + maxRegisteredWaitingTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd do these two log messages with string interpolation instead of using +. http://docs.scala-lang.org/overviews/core/string-interpolation.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markhamstra thanks

@tgravescs
Copy link
Contributor

Jenkins, this is okay to test

@tgravescs
Copy link
Contributor

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA tests have started for PR 1525. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17033/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA results for PR 1525:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17033/consoleFull

@@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _

val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
totalExpectedResources.getAndSet(maxCores.getOrElse(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "getAndSet" here instead of just "set"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, thanks @kayousterhout

@kayousterhout
Copy link
Contributor

I find it a bit confusing that "totalRegisteredResources" can refer to cores (in standalone mode) or executors (in Yarn mode). Can we just use different variables in each of the two cases and name them appropriately (so totalRegisteredCores and totalRegisteredExecutors)?

@li-zhihui
Copy link
Contributor Author

@kayousterhout I guess your meaning is "totalExpectedResources" (no variable "totalRegisteredResources").

Now totalRegisteredCores is totalCoreCount, and totalRegisteredExecutors is totalExecutors.

@li-zhihui
Copy link
Contributor Author

@kayousterhout I think using totalExpectedCores and totalExpectedExecutors replacing totalExpectedResources is a good idea, thanks.

@li-zhihui
Copy link
Contributor Author

@tgravescs @kayousterhout I add a new commit.

@li-zhihui
Copy link
Contributor Author

@tgravescs @kayousterhout can you close this PR before code frozen of 1.1 release? Otherwise, it would result in incompatible configuration property name because the PR rename spark.scheduler.maxRegisteredExecutorsWaitingTime to spark.scheduler.maxRegisteredResourcesWaitingTime

@kayousterhout
Copy link
Contributor

I will take a look at this tomorrow.

On Thu, Jul 31, 2014 at 10:37 PM, Zhihui Li [email protected]
wrote:

@tgravescs https://github.com/tgravescs @kayousterhout
https://github.com/kayousterhout can you close this PR before code
frozen of 1.1 release? Otherwise, it would result in incompatible
configuration property name because the PR rename
spark.scheduler.maxRegisteredExecutorsWaitingTime to
spark.scheduler.maxRegisteredResourcesWaitingTime


Reply to this email directly or view it on GitHub
#1525 (comment).

@@ -40,6 +41,10 @@ private[spark] class YarnClusterSchedulerBackend(
}
// System property can override environment variable.
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
totalExpectedExecutors.set(numExecutors)
totalExpectedExecutors = numExecutors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make this one line with the one above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
var totalExpectedExecutors = new AtomicInteger(0)
var totalExecutors = new AtomicInteger(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more nit: can we call this totalRegisteredExecutors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell
Copy link
Contributor

pwendell commented Aug 2, 2014

Hey all @kayousterhout asked me to look at this. To me, doesn't make semantic sense to expose spark.scheduler.minRegisteredExecutorsRatio in standalone mode because applications in standalone mode to not request a fixed number of executors a-priori. So my proposal is to remove this feature in standalone mode alltogether. The semantics of spark.cores.max is just the maximum number. In some cases users run jobs with this set well above the number of available cores (because they decided to run on a smaller cluster) and it is fully supported. If we enforce a minimum, it will cause all jobs to hang for those users.

If users in standalone mode want to wait, for now they should add their own code to sleep until the number of desired executors appears. They can do this by calling sc.getExecutorStorageStatus.size() or we can also add an API called sc.getNumExecutors that does this.

Also curious what @mateiz and @aarondav think about this. I haven't been following this patch previously.

@pwendell
Copy link
Contributor

pwendell commented Aug 2, 2014

I guess this currently is disabled by default, so that's good (it won't break behavior) but I still don't think spark.cores.max is really meant to be used in this way.

@kayousterhout
Copy link
Contributor

My preference is also to remove this for standalone mode (as mentioned in the original PR, #900) -- but adding @tgravescs who looked quite a bit at the original PR to see if we're forgetting something important here!

@li-zhihui
Copy link
Contributor Author

Maybe we should think the feature in standalone mode and mesos mode together.
Is it necessary in mesos mode? #1462
@tnachen

@tnachen
Copy link
Contributor

tnachen commented Aug 2, 2014

If what @pwendell said is true about sparks.core.max then I think probably need to rethink how to approach this regardless what mode we're in. I think I gears towards safety first since this is just an optimization.
I'm still getting up to speed about the scheduling in spark, but I believe the locality calculation is shared in TaskScheduler right?

@li-zhihui
Copy link
Contributor Author

As @pwendell says, the configuration spark.scheduler.minRegisteredExecutorsRatio is disable in standalone mode in default. And in the worst situation, it sleep spark.scheduler.maxRegisteredResourcesWaitingTime (just as currently recommendation). So, my opinion is to keep the configuration in all deploy mode(yarn, standalone, mesos).

@tnachen
Copy link
Contributor

tnachen commented Aug 4, 2014

I see, the max waiting time does mitigate this. I think minimally probably worth commenting on the configuration to warn the potential hanging and the max timeout will not allow the scheduler to wait forever.

@pwendell
Copy link
Contributor

pwendell commented Aug 7, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 7, 2014

QA tests have started for PR 1525. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18109/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 7, 2014

QA results for PR 1525:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18109/consoleFull

@tgravescs
Copy link
Contributor

We should figure out before we release 1.1 which we are going to support this on and make sure the config name is good. I'm fine either way as long as it works on the yarn side.

@kayousterhout
Copy link
Contributor

I think the verdict here is to leave this feature in, and this patch looks
good to me if it looks good to you, Tom.

On Thu, Aug 7, 2014 at 1:54 PM, Tom Graves [email protected] wrote:

We should figure out before we release 1.1 which we are going to support
this on and make sure the config name is good. I'm fine either way as long
as it works on the yarn side.


Reply to this email directly or view it on GitHub
#1525 (comment).

@tgravescs
Copy link
Contributor

Changes look good to me.

@pwendell
Copy link
Contributor

pwendell commented Aug 9, 2014

Okay cool LGTM too... after some thought I'm okay to leave it in. The name right now is a bit awkward, but I don't see a better option at this point. Thanks @kayousterhout and @tgravescs for looking at this.

asfgit pushed a commit that referenced this pull request Aug 9, 2014
…lone mode

In SPARK-1946(PR #900), configuration <code>spark.scheduler.minRegisteredExecutorsRatio</code> was introduced. However, in standalone mode, there is a race condition where isReady() can return true because totalExpectedExecutors has not been correctly set.

Because expected executors is uncertain in standalone mode, the PR try to use CPU cores(<code>--total-executor-cores</code>) as expected resources to judge whether SchedulerBackend is ready.

Author: li-zhihui <[email protected]>
Author: Li Zhihui <[email protected]>

Closes #1525 from li-zhihui/fixre4s and squashes the following commits:

e9a630b [Li Zhihui] Rename variable totalExecutors and clean codes
abf4860 [Li Zhihui] Push down variable totalExpectedResources to children classes
ca54bd9 [li-zhihui] Format log with String interpolation
88c7dc6 [li-zhihui] Few codes and docs refactor
41cf47e [li-zhihui] Fix race condition at SchedulerBackend.isReady in standalone mode
(cherry picked from commit 28dbae8)

Signed-off-by: Patrick Wendell <[email protected]>
@asfgit asfgit closed this in 28dbae8 Aug 9, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
…lone mode

In SPARK-1946(PR apache#900), configuration <code>spark.scheduler.minRegisteredExecutorsRatio</code> was introduced. However, in standalone mode, there is a race condition where isReady() can return true because totalExpectedExecutors has not been correctly set.

Because expected executors is uncertain in standalone mode, the PR try to use CPU cores(<code>--total-executor-cores</code>) as expected resources to judge whether SchedulerBackend is ready.

Author: li-zhihui <[email protected]>
Author: Li Zhihui <[email protected]>

Closes apache#1525 from li-zhihui/fixre4s and squashes the following commits:

e9a630b [Li Zhihui] Rename variable totalExecutors and clean codes
abf4860 [Li Zhihui] Push down variable totalExpectedResources to children classes
ca54bd9 [li-zhihui] Format log with String interpolation
88c7dc6 [li-zhihui] Few codes and docs refactor
41cf47e [li-zhihui] Fix race condition at SchedulerBackend.isReady in standalone mode
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants