Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19755][Mesos] Blacklist is always active for MesosCoarseGrainedSchedulerBackend #20640

Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)

// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2

private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)

private val executorCoresOption = conf.getOption("spark.executor.cores").map(_.toInt)
Expand Down Expand Up @@ -571,7 +568,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
numExecutors < executorLimit &&
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
// nodeBlacklist() currently only gets updated based on failures in spark tasks.
// If a mesos task fails to even start -- that is,
// if a spark executor fails to launch on a node -- nodeBlacklist does not get updated
// see SPARK-24567 for details
!scheduler.nodeBlacklist().contains(offerHostname) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to make really sure everybody understands the big change in behavior here -- nodeBlacklist() currently only gets updated based on failures in spark tasks. If a mesos task fails to even start -- that is, if a spark executor fails to launch on a node -- nodeBlacklist does not get updated. So you could have a node that is misconfigured somehow, and you might end up repeatedly trying to launch executors on it after this changed, with the executor even failing to start each time. That is even if you have blacklisting on.

This is SPARK-16630 for the non-mesos case. That is being actively worked on now -- however the work there will probably have to be yarn-specific, so there will still be followup work to get the same thing for mesos after that is in.

Copy link
Contributor

@skonto skonto May 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito sounds reasonable. In the mean time we have to deal with a limitation at the mesos side where the value is hardcoded. So we can move with this incrementally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe comment on this in the code here and add a JIRA for tracking?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checking looks a little late. Can we decline more faster without calculating everything?

meetsPortRequirements &&
satisfiesLocality(offerHostname)
}
Expand Down Expand Up @@ -648,14 +649,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
totalGpusAcquired -= gpus
gpusByTaskId -= taskId
}
// If it was a failure, mark the slave as failed for blacklisting purposes
if (TaskState.isFailed(state)) {
slave.taskFailures += 1

if (slave.taskFailures >= MAX_SLAVE_FAILURES) {
logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a concern to lose this error message? (I don't know anything about Mesos but it does seem potentially useful?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout BlacklistTracker has it's own logging that is concerned with blacklisted nodes, won't it be enough? on the other hand, if blacklisting is disabled, which is default, then we will lose this information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@IgorBerman Yes, in the default case, it would be nice to have this information about a task failing, especially if it fails repeatedly.

"is Spark installed on it?")
}
logError(s"Mesos task $taskId failed on Mesos slave $slaveId.")
}
executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
Expand Down Expand Up @@ -798,7 +793,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

private class Slave(val hostname: String) {
val taskIDs = new mutable.HashSet[String]()
var taskFailures = 0
var shuffleRegistered = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
verifyTaskLaunched(driver, "o2")
}

test("mesos declines offers from blacklisted slave") {
setBackend()

// launches a task on a valid offer on slave s1
val minMem = backend.executorMemory(sc) + 1024
val minCpu = 4
val offer1 = Resources(minMem, minCpu)
offerResources(List(offer1))
verifyTaskLaunched(driver, "o1")

// for any reason executor(aka mesos task) failed on s1
val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
backend.statusUpdate(driver, status)
when(taskScheduler.nodeBlacklist()).thenReturn(Set("hosts1"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to re-iterate my point above -- in many cases, having an executor fail will not lead to taskScheduler.nodeBlacklist() changing as you're doing here.


val offer2 = Resources(minMem, minCpu)
// Offer resources from the same slave
offerResources(List(offer2))
// but since it's blacklisted the offer is declined
verifyDeclinedOffer(driver, createOfferId("o1"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this actually pass? I thought it wouldn't b/c the filtering is done inside buildMesosTasks, which never calls declineOffer on offers that fail canLaunchTask. (a separate thing which needs fixing -- you could open another issue for that.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nevermind. I took another look at the code and now I see how this works

}

test("mesos supports spark.executor.cores") {
val executorCores = 4
setBackend(Map("spark.executor.cores" -> executorCores.toString))
Expand Down Expand Up @@ -790,6 +812,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
when(taskScheduler.nodeBlacklist()).thenReturn(Set[String]())

externalShuffleClient = mock[MesosExternalShuffleClient]

Expand Down