Skip to content

Commit

Permalink
remove NodeBlacklistRatio
Browse files Browse the repository at this point in the history
  • Loading branch information
attilapiros committed Jun 13, 2018
1 parent 0e78b38 commit 61f3d17
Show file tree
Hide file tree
Showing 17 changed files with 64 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[scheduler] class BlacklistTracker (
* An immutable copy of the set of nodes that are currently blacklisted. Kept in an
* AtomicReference to make [[nodeBlacklist()]] thread-safe.
*/
private val _nodeBlacklist = new AtomicReference[Map[String, Long]](Map())
private val _nodeBlacklist = new AtomicReference[Set[String]](Set())
/**
* Time when the next blacklist will expire. Used as a
* shortcut to avoid iterating over all entries in the blacklist when none will have expired.
Expand Down Expand Up @@ -126,7 +126,7 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.remove(node)
listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
}
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.toMap)
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}
updateNextExpiryTime()
}
Expand Down Expand Up @@ -196,7 +196,7 @@ private[scheduler] class BlacklistTracker (

nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.toMap)
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
killExecutorsOnBlacklistedNode(host)
updateNextExpiryTime()
}
Expand Down Expand Up @@ -258,7 +258,7 @@ private[scheduler] class BlacklistTracker (
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.toMap)
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
killExecutorsOnBlacklistedNode(node)
}
}
Expand All @@ -270,11 +270,10 @@ private[scheduler] class BlacklistTracker (
}

/**
* Get the full set of blacklisted nodes with expiry dates of the blacklisting.
* Unlike other methods in this class, this *IS* thread-safe -- no lock required
* on a taskScheduler.
* Get the full set of nodes that are blacklisted. Unlike other methods in this class, this *IS*
* thread-safe -- no lock required on a taskScheduler.
*/
def nodeBlacklist(): Map[String, Long] = {
def nodeBlacklist(): Set[String] = {
_nodeBlacklist.get()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,8 @@ private[spark] class TaskSchedulerImpl(
* Get a snapshot of the currently blacklisted nodes for the entire application. This is
* thread-safe -- it can be called without a lock on the TaskScheduler.
*/
def nodeBlacklistWithExpiryTimes(): Map[String, Long] = {
blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(Map())
def nodeBlacklist(): scala.collection.immutable.Set[String] = {
blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Set())
}

// By default, rack is unknown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[spark] object CoarseGrainedClusterMessages {
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
nodeBlacklistWithExpiryTimes: Map[String, Long])
nodeBlacklist: Set[String])
extends CoarseGrainedClusterMessage

// Check if an executor was force-killed but for a reason unrelated to the running tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklistWithExpiryTimes.contains(hostname)) {
} else if (scheduler.nodeBlacklist.contains(hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class HeartbeatReceiverSuite
sc = spy(new SparkContext(conf))
scheduler = mock(classOf[TaskSchedulerImpl])
when(sc.taskScheduler).thenReturn(scheduler)
when(scheduler.nodeBlacklistWithExpiryTimes).thenReturn(Map[String, Long]())
when(scheduler.sc).thenReturn(sc)
heartbeatReceiverClock = new ManualClock
heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
Expand Down Expand Up @@ -272,7 +271,7 @@ private class FakeSchedulerBackend(

protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Map.empty))
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty))
}

protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,7 @@ class StandaloneDynamicAllocationSuite

// Get "localhost" on a blacklist.
val taskScheduler = mock(classOf[TaskSchedulerImpl])
when(taskScheduler.nodeBlacklistWithExpiryTimes())
.thenReturn(Map("blacklisted-host" -> Long.MaxValue))
when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
when(taskScheduler.sc).thenReturn(sc)
sc.taskScheduler = taskScheduler

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
assert(blacklist.nodeBlacklist().keySet === Set())
assert(blacklist.nodeBlacklist() === Set())
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "1", 4))
Expand All @@ -191,7 +191,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
assert(blacklist.nodeBlacklist().keySet === Set("hostA"))
assert(blacklist.nodeBlacklist() === Set("hostA"))
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set("hostA"))
verify(listenerBusMock).post(SparkListenerNodeBlacklisted(0, "hostA", 2))
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
Expand All @@ -202,7 +202,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val timeout = blacklist.BLACKLIST_TIMEOUT_MILLIS + 1
clock.advance(timeout)
blacklist.applyBlacklistTimeout()
assert(blacklist.nodeBlacklist().keySet === Set())
assert(blacklist.nodeBlacklist() === Set())
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
verify(listenerBusMock).post(SparkListenerExecutorUnblacklisted(timeout, "2"))
Expand All @@ -215,7 +215,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures)
assert(blacklist.nodeBlacklist().keySet === Set())
assert(blacklist.nodeBlacklist() === Set())
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
when(sparkContext.conf).thenReturn(sparkConf)
when(sparkContext.listenerBus).thenReturn(listenerBus)
when(taskSchedulerImpl.sc).thenReturn(sparkContext)
when(taskSchedulerImpl.nodeBlacklistWithExpiryTimes).thenReturn(Map[String, Long]())
when(taskSchedulerImpl.nodeBlacklist).thenReturn(Set[String]())
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withLabel(SPARK_APP_ID_LABEL, APP_ID)).thenReturn(podsWithLabelOperations)
when(podsWithLabelOperations.watch(executorPodsWatcherArgument.capture()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)

taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.nodeBlacklistWithExpiryTimes).thenReturn(Map[String, Long]())
when(taskScheduler.nodeBlacklist).thenReturn(Set[String]())
when(taskScheduler.sc).thenReturn(sc)

externalShuffleClient = mock[MesosExternalShuffleClient]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else if (allocator.isAllNodeBlacklisted) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Due to executor failures all available nodes are blacklisted")
} else {
logDebug("Sending progress")
allocator.allocateResources()
Expand Down Expand Up @@ -729,7 +733,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
Option(allocator) match {
case Some(a) =>
if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklistWithExpiryTimes)) {
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
resetAllocatorInterval()
}
context.reply(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ private[yarn] class YarnAllocator(

def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors

def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted

/**
* A sequence of pending container requests that have not yet been fulfilled.
*/
Expand All @@ -183,23 +185,22 @@ private[yarn] class YarnAllocator(
* @param localityAwareTasks number of locality aware tasks to be used as container placement hint
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
* container placement hint.
* @param schedulerBlacklist blacklisted nodes with expiry times, which is passed in to avoid
* allocating new containers on them. It will be used to update
* the application master's blacklist.
* @param nodeBlacklist blacklisted nodes, which is passed in to avoid allocating new containers
* on them. It will be used to update the application master's blacklist.
* @return Whether the new requested total is different than the old value.
*/
def requestTotalExecutorsWithPreferredLocalities(
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
schedulerBlacklist: Map[String, Long]): Boolean = synchronized {
nodeBlacklist: Set[String]): Boolean = synchronized {
this.numLocalityAwareTasks = localityAwareTasks
this.hostToLocalTaskCounts = hostToLocalTaskCount

if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal
allocatorBlacklistTracker.setSchedulerBlacklistedNodes(schedulerBlacklist)
allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
true
} else {
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,11 @@ private[spark] class YarnAllocatorBlacklistTracker(

private val maxFailuresPerHost = sparkConf.get(MAX_FAILED_EXEC_PER_NODE)

private val blacklistMaxNodeRatio =
sparkConf.get(YARN_BLACKLIST_MAX_NODE_BLACKLIST_RATIO)

private val allocatorBlacklist = new HashMap[String, Long]()

private var currentBlacklistedYarnNodes = Set.empty[String]

private var schedulerBlacklist = Map.empty[String, Long]
private var schedulerBlacklist = Set.empty[String]

private var numClusterNodes = Int.MaxValue

Expand Down Expand Up @@ -101,35 +98,17 @@ private[spark] class YarnAllocatorBlacklistTracker(
}
}

def setSchedulerBlacklistedNodes(schedulerBlacklistedNodesWithExpiry: Map[String, Long]): Unit = {
def setSchedulerBlacklistedNodes(schedulerBlacklistedNodesWithExpiry: Set[String]): Unit = {
this.schedulerBlacklist = schedulerBlacklistedNodesWithExpiry
refreshBlacklistedNodes()
}

def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= numClusterNodes

private def refreshBlacklistedNodes(): Unit = {
removeExpiredYarnBlacklistedNodes()
val limit = (numClusterNodes * blacklistMaxNodeRatio).toInt
val allBlacklistedNodes = schedulerBlacklist.keySet ++ allocatorBlacklist.keySet
val nodesToBlacklist =
if (allBlacklistedNodes.size > limit) {
mostRelevantSubsetOfBlacklistedNodes(limit)
} else {
allBlacklistedNodes
}

synchronizeBlacklistedNodeWithYarn(nodesToBlacklist)
}

private def mostRelevantSubsetOfBlacklistedNodes(limit: Int) = {
val allBlacklist = schedulerBlacklist ++ allocatorBlacklist
val relevant = allBlacklist
.toSeq
.sortBy(_._2)(Ordering[Long].reverse)
.take(limit)
.map(_._1)
.toSet
logInfo(s"blacklist size limit ($limit) is reached, total count: ${allBlacklist.size}")
relevant
val allBlacklistedNodes = schedulerBlacklist ++ allocatorBlacklist.keySet
synchronizeBlacklistedNodeWithYarn(allBlacklistedNodes)
}

private def synchronizeBlacklistedNodeWithYarn(nodesToBlacklist: Set[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,4 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val YARN_BLACKLIST_MAX_NODE_BLACKLIST_RATIO =
ConfigBuilder("spark.yarn.blacklist.maxNodeBlacklistRatio")
.doc("The maximum fraction of the cluster nodes that will be blacklisted " +
"for yarn allocations, based on task & allocation failures")
.doubleConf
.checkValue(weight => weight >= 0 && weight <= 1,
"The value of this ratio must be in [0, 1].")
.createWithDefault(0.75)

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ private[spark] abstract class YarnSchedulerBackend(
}

private[cluster] def prepareRequestExecutors(requestedTotal: Int): RequestExecutors = {
val nodeBlacklistWithExpiryTimes: Map[String, Long] = scheduler.nodeBlacklistWithExpiryTimes()
val nodeBlacklist: Set[String] = scheduler.nodeBlacklist()
// For locality preferences, ignore preferences for nodes that are blacklisted
val filteredHostToLocalTaskCount =
hostToLocalTaskCount.filter { case (k, v) => !nodeBlacklistWithExpiryTimes.contains(k) }
hostToLocalTaskCount.filter { case (k, v) => !nodeBlacklist.contains(k) }
RequestExecutors(requestedTotal, localityAwareTasks, filteredHostToLocalTaskCount,
nodeBlacklistWithExpiryTimes)
nodeBlacklist)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,24 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
clock.advance(BLACKLIST_TIMEOUT)

// trigger synchronisation of blacklisted nodes with YARN
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map())
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set())
verify(amClientMock).updateBlacklist(Collections.emptyList(), Arrays.asList("host"))
}

test("not handling the expiry of scheduler blacklisted nodes") {
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map("host1" -> 100, "host2" -> 150))
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
verify(amClientMock)
.updateBlacklist(Arrays.asList("host1", "host2"), Collections.emptyList())

// advance timer more then host1, host2 expiry time
clock.advance(200L)

// expired blacklisted nodes (simulating a resource request)
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map("host1" -> 100, "host2" -> 150))
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
// no change is communicated to YARN regarding the blacklisting
verify(amClientMock).updateBlacklist(Collections.emptyList(), Collections.emptyList())
}


test("combining scheduler and allocation blacklist") {
(1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
_ => {
Expand All @@ -106,20 +105,19 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
verify(amClientMock)
.updateBlacklist(Arrays.asList("host1"), Collections.emptyList())

yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map("host2" -> 100, "host3" -> 150))
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host2", "host3"))
verify(amClientMock)
.updateBlacklist(Arrays.asList("host2", "host3"), Collections.emptyList())

clock.advance(10L)

yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map("host3" -> 100, "host4" -> 200))
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host3", "host4"))
verify(amClientMock)
.updateBlacklist(Arrays.asList("host4"), Arrays.asList("host2"))
}

test("if blacklist size limit is exceeded, prefer to blacklist nodes with the latest expiry") {
yarnBlacklistTracker.setSchedulerBlacklistedNodes(
Map("host1" -> 150, "host2" -> 110, "host3" -> 200))
test("blacklist all available nodes") {
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2", "host3"))
verify(amClientMock)
.updateBlacklist(Arrays.asList("host1", "host2", "host3"), Collections.emptyList())

Expand All @@ -136,25 +134,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
// the third failure on the host triggers the blacklisting
yarnBlacklistTracker.handleResourceAllocationFailure(Some("host4"))

// the blacklistSizeLimit is 3 as numClusterNodes is 4 and BLACKLIST_SIZE_DEFAULT_WEIGHT is 0.75
// this is why the 3 late expired nodes are chosen from the four nodes of
// "host1" -> 150, "host2" -> 110, "host3" -> 200, "host4" -> 160.
// So "host2" is removed from the previous state and "host4" is added
verify(amClientMock).updateBlacklist(Arrays.asList("host4"), Arrays.asList("host2"))

clock.advance(10L)
(1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
_ => {
yarnBlacklistTracker.handleResourceAllocationFailure(Some("host5"))
verify(amClientMock, never())
.updateBlacklist(Arrays.asList("host5"), Collections.emptyList())
}
}

// the third failure on the host triggers the blacklisting
yarnBlacklistTracker.handleResourceAllocationFailure(Some("host5"))
// "host1" -> 150, "host2" -> 110, "host3" -> 200, "host4" -> 160, host4 -> 170
verify(amClientMock).updateBlacklist(Arrays.asList("host5"), Arrays.asList("host1"))
verify(amClientMock).updateBlacklist(Arrays.asList("host4"), Collections.emptyList())
assert(yarnBlacklistTracker.isAllNodeBlacklisted === true)
}

}
Loading

0 comments on commit 61f3d17

Please sign in to comment.