Skip to content

Commit

Permalink
SPARK-19755 declining offers from blacklisted slave by BlacklistTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
IgorBerman committed Feb 21, 2018
1 parent f09faf7 commit e2ddc1b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
numExecutors < executorLimit &&
!scheduler.nodeBlacklist().contains(offerHostname) &&
meetsPortRequirements &&
satisfiesLocality(offerHostname)
}
Expand Down Expand Up @@ -792,4 +793,4 @@ object IdHelper {
// Use atomic values since Spark contexts can be initialized in parallel
private[mesos] val nextSCNumber = new AtomicLong(0)
private[mesos] val startedBefore = new AtomicBoolean(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
verifyTaskLaunched(driver, "o2")
}

test("mesos declines offers from blacklisted slave") {
setBackend()

// launches a task on a valid offer on slave s1
val minMem = backend.executorMemory(sc) + 1024
val minCpu = 4
val offer1 = Resources(minMem, minCpu)
offerResources(List(offer1))
verifyTaskLaunched(driver, "o1")

// for any reason executor(aka mesos task) failed on s1
val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
backend.statusUpdate(driver, status)
when(taskScheduler.nodeBlacklist()).thenReturn(Set("hosts1"))

val offer2 = Resources(minMem, minCpu)
// Offer resources from the same slave
offerResources(List(offer2))
// but since it's blacklisted the offer is declined
verifyDeclinedOffer(driver, createOfferId("o1"))
}

test("mesos supports spark.executor.cores") {
val executorCores = 4
setBackend(Map("spark.executor.cores" -> executorCores.toString))
Expand Down Expand Up @@ -790,6 +812,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
when(taskScheduler.nodeBlacklist()).thenReturn(Set[String]())

externalShuffleClient = mock[MesosExternalShuffleClient]

Expand Down

0 comments on commit e2ddc1b

Please sign in to comment.