-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19755][Mesos] Blacklist is always active for MesosCoarseGrainedSchedulerBackend #20640
Changes from 7 commits
636959a
f09faf7
e2ddc1b
66ed5af
a7ff8cc
2c47271
5eda874
104d44f
95ca22c
cb8cb57
83cabff
2fc0288
04f931d
dbe7d18
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,9 +62,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager = | ||
new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint) | ||
|
||
// Blacklist a slave after this many failures | ||
private val MAX_SLAVE_FAILURES = 2 | ||
|
||
private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt) | ||
|
||
private val executorCoresOption = conf.getOption("spark.executor.cores").map(_.toInt) | ||
|
@@ -571,7 +568,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
cpus + totalCoresAcquired <= maxCores && | ||
mem <= offerMem && | ||
numExecutors < executorLimit && | ||
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && | ||
// nodeBlacklist() currently only gets updated based on failures in spark tasks. | ||
// If a mesos task fails to even start -- that is, | ||
// if a spark executor fails to launch on a node -- nodeBlacklist does not get updated | ||
// see SPARK-24567 for details | ||
!scheduler.nodeBlacklist().contains(offerHostname) && | ||
meetsPortRequirements && | ||
satisfiesLocality(offerHostname) | ||
} | ||
|
@@ -648,14 +649,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
totalGpusAcquired -= gpus | ||
gpusByTaskId -= taskId | ||
} | ||
// If it was a failure, mark the slave as failed for blacklisting purposes | ||
if (TaskState.isFailed(state)) { | ||
slave.taskFailures += 1 | ||
|
||
if (slave.taskFailures >= MAX_SLAVE_FAILURES) { | ||
logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it a concern to lose this error message? (I don't know anything about Mesos but it does seem potentially useful?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kayousterhout BlacklistTracker has it's own logging that is concerned with blacklisted nodes, won't it be enough? on the other hand, if blacklisting is disabled, which is default, then we will lose this information. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @IgorBerman Yes, in the default case, it would be nice to have this information about a task failing, especially if it fails repeatedly. |
||
"is Spark installed on it?") | ||
} | ||
logError(s"Mesos task $taskId failed on Mesos slave $slaveId.") | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
executorTerminated(d, slaveId, taskId, s"Executor finished with state $state") | ||
// In case we'd rejected everything before but have now lost a node | ||
|
@@ -798,7 +793,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
|
||
private class Slave(val hostname: String) { | ||
val taskIDs = new mutable.HashSet[String]() | ||
var taskFailures = 0 | ||
var shuffleRegistered = false | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite | |
verifyTaskLaunched(driver, "o2") | ||
} | ||
|
||
test("mesos declines offers from blacklisted slave") { | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
setBackend() | ||
|
||
// launches a task on a valid offer on slave s1 | ||
val minMem = backend.executorMemory(sc) + 1024 | ||
val minCpu = 4 | ||
val offer1 = Resources(minMem, minCpu) | ||
offerResources(List(offer1)) | ||
verifyTaskLaunched(driver, "o1") | ||
|
||
// for any reason executor(aka mesos task) failed on s1 | ||
val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED) | ||
backend.statusUpdate(driver, status) | ||
when(taskScheduler.nodeBlacklist()).thenReturn(Set("hosts1")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just to re-iterate my point above -- in many cases, having an executor fail will not lead to |
||
|
||
val offer2 = Resources(minMem, minCpu) | ||
// Offer resources from the same slave | ||
offerResources(List(offer2)) | ||
// but since it's blacklisted the offer is declined | ||
verifyDeclinedOffer(driver, createOfferId("o1")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will this actually pass? I thought it wouldn't b/c the filtering is done inside There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah nevermind. I took another look at the code and now I see how this works |
||
} | ||
|
||
test("mesos supports spark.executor.cores") { | ||
val executorCores = 4 | ||
setBackend(Map("spark.executor.cores" -> executorCores.toString)) | ||
|
@@ -790,6 +812,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite | |
|
||
taskScheduler = mock[TaskSchedulerImpl] | ||
when(taskScheduler.sc).thenReturn(sc) | ||
when(taskScheduler.nodeBlacklist()).thenReturn(Set[String]()) | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
externalShuffleClient = mock[MesosExternalShuffleClient] | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to make really sure everybody understands the big change in behavior here --
nodeBlacklist()
currently only gets updated based on failures in spark tasks. If a mesos task fails to even start -- that is, if a spark executor fails to launch on a node --nodeBlacklist
does not get updated. So you could have a node that is misconfigured somehow, and you might end up repeatedly trying to launch executors on it after this changed, with the executor even failing to start each time. That is even if you have blacklisting on.This is SPARK-16630 for the non-mesos case. That is being actively worked on now -- however the work there will probably have to be yarn-specific, so there will still be followup work to get the same thing for mesos after that is in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito sounds reasonable. In the mean time we have to deal with a limitation at the mesos side where the value is hardcoded. So we can move with this incrementally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe comment on this in the code here and add a JIRA for tracking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This checking looks a little late. Can we decline more faster without calculating everything?