Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19755][Mesos] Blacklist is always active for MesosCoarseGrainedSchedulerBackend #20640

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient
import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
Expand All @@ -61,9 +61,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with org.apache.mesos.Scheduler with MesosSchedulerUtils {

// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2

private val maxCoresOption = conf.get(config.CORES_MAX)

private val executorCoresOption = conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt)
Expand Down Expand Up @@ -378,9 +375,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

logDebug(s"Received ${offers.size} resource offers.")

// nodeBlacklist() currently only gets updated based on failures in spark tasks.
// If a mesos task fails to even start -- that is,
// if a spark executor fails to launch on a node -- nodeBlacklist does not get updated
// see SPARK-24567 for details
val blacklist = scheduler.nodeBlacklist()
val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
val offerAttributes = toAttributeMap(offer.getAttributesList)
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
if (blacklist.contains(offer.getHostname)) {
false
} else {
val offerAttributes = toAttributeMap(offer.getAttributesList)
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
}
}

declineUnmatchedOffers(d, unmatchedOffers)
Expand Down Expand Up @@ -583,7 +589,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
numExecutors < executorLimit &&
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
meetsPortRequirements &&
satisfiesLocality(offerHostname)
}
Expand Down Expand Up @@ -659,14 +664,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
totalGpusAcquired -= gpus
gpusByTaskId -= taskId
}
// If it was a failure, mark the slave as failed for blacklisting purposes
if (TaskState.isFailed(state)) {
slave.taskFailures += 1

if (slave.taskFailures >= MAX_SLAVE_FAILURES) {
logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a concern to lose this error message? (I don't know anything about Mesos but it does seem potentially useful?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout BlacklistTracker has it's own logging that is concerned with blacklisted nodes, won't it be enough? on the other hand, if blacklisting is disabled, which is default, then we will lose this information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@IgorBerman Yes, in the default case, it would be nice to have this information about a task failing, especially if it fails repeatedly.

"is Spark installed on it?")
}
logError(s"Mesos task $taskId failed on Mesos slave $slaveId.")
}
executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
Expand Down Expand Up @@ -796,7 +795,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

private class Slave(val hostname: String) {
val taskIDs = new mutable.HashSet[String]()
var taskFailures = 0
var shuffleRegistered = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
import org.apache.spark.scheduler.cluster.mesos.Utils._

class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
Expand Down Expand Up @@ -107,6 +107,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
verifyTaskLaunched(driver, "o2")
}

test("SPARK-19755 mesos declines offers from blacklisted slave") {
setBackend()

// launches a task on a valid offer on slave s1
val minMem = backend.executorMemory(sc) + 1024
val minCpu = 4
val offer1 = Resources(minMem, minCpu)
offerResources(List(offer1))
verifyTaskLaunched(driver, "o1")

// for any reason executor(aka mesos task) failed on s1
val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
backend.statusUpdate(driver, status)
when(taskScheduler.nodeBlacklist()).thenReturn(Set("hosts1"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to re-iterate my point above -- in many cases, having an executor fail will not lead to taskScheduler.nodeBlacklist() changing as you're doing here.


val offer2 = Resources(minMem, minCpu)
// Offer resources from the same slave
offerResources(List(offer2))
// but since it's blacklisted the offer is declined
verifyDeclinedOffer(driver, createOfferId("o1"), true)
}

test("mesos supports spark.executor.cores") {
val executorCores = 4
setBackend(Map(EXECUTOR_CORES.key -> executorCores.toString))
Expand Down