From 61f3d1718072c252298b6d8ddcca333d1cf122a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Wed, 13 Jun 2018 11:10:44 +0200 Subject: [PATCH] remove NodeBlacklistRatio --- .../spark/scheduler/BlacklistTracker.scala | 15 ++++---- .../spark/scheduler/TaskSchedulerImpl.scala | 4 +- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 2 +- .../apache/spark/HeartbeatReceiverSuite.scala | 3 +- .../StandaloneDynamicAllocationSuite.scala | 3 +- .../scheduler/BlacklistTrackerSuite.scala | 8 ++-- ...bernetesClusterSchedulerBackendSuite.scala | 2 +- ...osCoarseGrainedSchedulerBackendSuite.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 6 ++- .../spark/deploy/yarn/YarnAllocator.scala | 11 +++--- .../yarn/YarnAllocatorBlacklistTracker.scala | 33 +++------------- .../org/apache/spark/deploy/yarn/config.scala | 9 ----- .../cluster/YarnSchedulerBackend.scala | 6 +-- .../YarnAllocatorBlacklistTrackerSuite.scala | 38 +++++-------------- .../deploy/yarn/YarnAllocatorSuite.scala | 24 ++++++------ .../cluster/YarnSchedulerBackendSuite.scala | 9 ++--- 17 files changed, 64 insertions(+), 113 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 1ad4f71b4a41e..30cf75d43ee09 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -77,7 +77,7 @@ private[scheduler] class BlacklistTracker ( * An immutable copy of the set of nodes that are currently blacklisted. Kept in an * AtomicReference to make [[nodeBlacklist()]] thread-safe. */ - private val _nodeBlacklist = new AtomicReference[Map[String, Long]](Map()) + private val _nodeBlacklist = new AtomicReference[Set[String]](Set()) /** * Time when the next blacklist will expire. Used as a * shortcut to avoid iterating over all entries in the blacklist when none will have expired. @@ -126,7 +126,7 @@ private[scheduler] class BlacklistTracker ( nodeIdToBlacklistExpiryTime.remove(node) listenerBus.post(SparkListenerNodeUnblacklisted(now, node)) } - _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.toMap) + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) } updateNextExpiryTime() } @@ -196,7 +196,7 @@ private[scheduler] class BlacklistTracker ( nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists) listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1)) - _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.toMap) + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) killExecutorsOnBlacklistedNode(host) updateNextExpiryTime() } @@ -258,7 +258,7 @@ private[scheduler] class BlacklistTracker ( s"executors blacklisted: ${blacklistedExecsOnNode}") nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) - _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.toMap) + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) killExecutorsOnBlacklistedNode(node) } } @@ -270,11 +270,10 @@ private[scheduler] class BlacklistTracker ( } /** - * Get the full set of blacklisted nodes with expiry dates of the blacklisting. - * Unlike other methods in this class, this *IS* thread-safe -- no lock required - * on a taskScheduler. + * Get the full set of nodes that are blacklisted. Unlike other methods in this class, this *IS* + * thread-safe -- no lock required on a taskScheduler. */ - def nodeBlacklist(): Map[String, Long] = { + def nodeBlacklist(): Set[String] = { _nodeBlacklist.get() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8cb576c79defa..0c11806b3981b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -651,8 +651,8 @@ private[spark] class TaskSchedulerImpl( * Get a snapshot of the currently blacklisted nodes for the entire application. This is * thread-safe -- it can be called without a lock on the TaskScheduler. */ - def nodeBlacklistWithExpiryTimes(): Map[String, Long] = { - blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(Map()) + def nodeBlacklist(): scala.collection.immutable.Set[String] = { + blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Set()) } // By default, rack is unknown diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 05879d00d078e..e8b7fc0ef100a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -110,7 +110,7 @@ private[spark] object CoarseGrainedClusterMessages { requestedTotal: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], - nodeBlacklistWithExpiryTimes: Map[String, Long]) + nodeBlacklist: Set[String]) extends CoarseGrainedClusterMessage // Check if an executor was force-killed but for a reason unrelated to the running tasks. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 159c537dd36e0..18c6c0aace56d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -170,7 +170,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) - } else if (scheduler.nodeBlacklistWithExpiryTimes.contains(hostname)) { + } else if (scheduler.nodeBlacklist.contains(hostname)) { // If the cluster manager gives us an executor on a blacklisted node (because it // already started allocating those resources before we informed it of our blacklist, // or if it ignored our blacklist), then we reject that executor immediately. diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index eaffe1adf69b6..1c87f5c53b177 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -72,7 +72,6 @@ class HeartbeatReceiverSuite sc = spy(new SparkContext(conf)) scheduler = mock(classOf[TaskSchedulerImpl]) when(sc.taskScheduler).thenReturn(scheduler) - when(scheduler.nodeBlacklistWithExpiryTimes).thenReturn(Map[String, Long]()) when(scheduler.sc).thenReturn(sc) heartbeatReceiverClock = new ManualClock heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock) @@ -272,7 +271,7 @@ private class FakeSchedulerBackend( protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { clusterManagerEndpoint.ask[Boolean]( - RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Map.empty)) + RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty)) } protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 78e1f95ebb46f..27cc47496c805 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -501,8 +501,7 @@ class StandaloneDynamicAllocationSuite // Get "localhost" on a blacklist. val taskScheduler = mock(classOf[TaskSchedulerImpl]) - when(taskScheduler.nodeBlacklistWithExpiryTimes()) - .thenReturn(Map("blacklisted-host" -> Long.MaxValue)) + when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host")) when(taskScheduler.sc).thenReturn(sc) sc.taskScheduler = taskScheduler diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 7aa5d6403df0f..96c8404327e24 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -177,7 +177,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M "hostA", exec = "1", index = partition, failureReason = "testing") } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures) - assert(blacklist.nodeBlacklist().keySet === Set()) + assert(blacklist.nodeBlacklist() === Set()) assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set()) assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1")) verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "1", 4)) @@ -191,7 +191,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M "hostA", exec = "2", index = partition, failureReason = "testing") } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures) - assert(blacklist.nodeBlacklist().keySet === Set("hostA")) + assert(blacklist.nodeBlacklist() === Set("hostA")) assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set("hostA")) verify(listenerBusMock).post(SparkListenerNodeBlacklisted(0, "hostA", 2)) assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2")) @@ -202,7 +202,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M val timeout = blacklist.BLACKLIST_TIMEOUT_MILLIS + 1 clock.advance(timeout) blacklist.applyBlacklistTimeout() - assert(blacklist.nodeBlacklist().keySet === Set()) + assert(blacklist.nodeBlacklist() === Set()) assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set()) assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set()) verify(listenerBusMock).post(SparkListenerExecutorUnblacklisted(timeout, "2")) @@ -215,7 +215,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M taskSetBlacklist2.updateBlacklistForFailedTask( "hostA", exec = "1", index = 0, failureReason = "testing") blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures) - assert(blacklist.nodeBlacklist().keySet === Set()) + assert(blacklist.nodeBlacklist() === Set()) assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set()) assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set()) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index eb89b9700fd9f..52f10ade7ed47 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -152,7 +152,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn when(sparkContext.conf).thenReturn(sparkConf) when(sparkContext.listenerBus).thenReturn(listenerBus) when(taskSchedulerImpl.sc).thenReturn(sparkContext) - when(taskSchedulerImpl.nodeBlacklistWithExpiryTimes).thenReturn(Map[String, Long]()) + when(taskSchedulerImpl.nodeBlacklist).thenReturn(Set[String]()) when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withLabel(SPARK_APP_ID_LABEL, APP_ID)).thenReturn(podsWithLabelOperations) when(podsWithLabelOperations.watch(executorPodsWatcherArgument.capture())) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 1445d44edc045..b790c7cd27794 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -789,7 +789,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.nodeBlacklistWithExpiryTimes).thenReturn(Map[String, Long]()) + when(taskScheduler.nodeBlacklist).thenReturn(Set[String]()) when(taskScheduler.sc).thenReturn(sc) externalShuffleClient = mock[MesosExternalShuffleClient] diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1234140cc573f..59b0d1cc05f4a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -508,6 +508,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, s"Max number of executor failures ($maxNumExecutorFailures) reached") + } else if (allocator.isAllNodeBlacklisted) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, + "Due to executor failures all available nodes are blacklisted") } else { logDebug("Sending progress") allocator.allocateResources() @@ -729,7 +733,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Option(allocator) match { case Some(a) => if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal, - r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklistWithExpiryTimes)) { + r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) { resetAllocatorInterval() } context.reply(true) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 8581973ab8def..fae054e0eea00 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -160,6 +160,8 @@ private[yarn] class YarnAllocator( def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors + def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted + /** * A sequence of pending container requests that have not yet been fulfilled. */ @@ -183,23 +185,22 @@ private[yarn] class YarnAllocator( * @param localityAwareTasks number of locality aware tasks to be used as container placement hint * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as * container placement hint. - * @param schedulerBlacklist blacklisted nodes with expiry times, which is passed in to avoid - * allocating new containers on them. It will be used to update - * the application master's blacklist. + * @param nodeBlacklist blacklisted nodes, which is passed in to avoid allocating new containers + * on them. It will be used to update the application master's blacklist. * @return Whether the new requested total is different than the old value. */ def requestTotalExecutorsWithPreferredLocalities( requestedTotal: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], - schedulerBlacklist: Map[String, Long]): Boolean = synchronized { + nodeBlacklist: Set[String]): Boolean = synchronized { this.numLocalityAwareTasks = localityAwareTasks this.hostToLocalTaskCounts = hostToLocalTaskCount if (requestedTotal != targetNumExecutors) { logInfo(s"Driver requested a total number of $requestedTotal executor(s).") targetNumExecutors = requestedTotal - allocatorBlacklistTracker.setSchedulerBlacklistedNodes(schedulerBlacklist) + allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist) true } else { false diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala index 622c60fda2cec..8d5b3a2f29c2e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala @@ -58,14 +58,11 @@ private[spark] class YarnAllocatorBlacklistTracker( private val maxFailuresPerHost = sparkConf.get(MAX_FAILED_EXEC_PER_NODE) - private val blacklistMaxNodeRatio = - sparkConf.get(YARN_BLACKLIST_MAX_NODE_BLACKLIST_RATIO) - private val allocatorBlacklist = new HashMap[String, Long]() private var currentBlacklistedYarnNodes = Set.empty[String] - private var schedulerBlacklist = Map.empty[String, Long] + private var schedulerBlacklist = Set.empty[String] private var numClusterNodes = Int.MaxValue @@ -101,35 +98,17 @@ private[spark] class YarnAllocatorBlacklistTracker( } } - def setSchedulerBlacklistedNodes(schedulerBlacklistedNodesWithExpiry: Map[String, Long]): Unit = { + def setSchedulerBlacklistedNodes(schedulerBlacklistedNodesWithExpiry: Set[String]): Unit = { this.schedulerBlacklist = schedulerBlacklistedNodesWithExpiry refreshBlacklistedNodes() } + def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= numClusterNodes + private def refreshBlacklistedNodes(): Unit = { removeExpiredYarnBlacklistedNodes() - val limit = (numClusterNodes * blacklistMaxNodeRatio).toInt - val allBlacklistedNodes = schedulerBlacklist.keySet ++ allocatorBlacklist.keySet - val nodesToBlacklist = - if (allBlacklistedNodes.size > limit) { - mostRelevantSubsetOfBlacklistedNodes(limit) - } else { - allBlacklistedNodes - } - - synchronizeBlacklistedNodeWithYarn(nodesToBlacklist) - } - - private def mostRelevantSubsetOfBlacklistedNodes(limit: Int) = { - val allBlacklist = schedulerBlacklist ++ allocatorBlacklist - val relevant = allBlacklist - .toSeq - .sortBy(_._2)(Ordering[Long].reverse) - .take(limit) - .map(_._1) - .toSet - logInfo(s"blacklist size limit ($limit) is reached, total count: ${allBlacklist.size}") - relevant + val allBlacklistedNodes = schedulerBlacklist ++ allocatorBlacklist.keySet + synchronizeBlacklistedNodeWithYarn(allBlacklistedNodes) } private def synchronizeBlacklistedNodeWithYarn(nodesToBlacklist: Set[String]): Unit = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 931cec2f76091..129084a86597a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -334,13 +334,4 @@ package object config { .booleanConf .createWithDefault(false) - private[spark] val YARN_BLACKLIST_MAX_NODE_BLACKLIST_RATIO = - ConfigBuilder("spark.yarn.blacklist.maxNodeBlacklistRatio") - .doc("The maximum fraction of the cluster nodes that will be blacklisted " + - "for yarn allocations, based on task & allocation failures") - .doubleConf - .checkValue(weight => weight >= 0 && weight <= 1, - "The value of this ratio must be in [0, 1].") - .createWithDefault(0.75) - } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index a227393b11396..63bea3e7a5003 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -128,12 +128,12 @@ private[spark] abstract class YarnSchedulerBackend( } private[cluster] def prepareRequestExecutors(requestedTotal: Int): RequestExecutors = { - val nodeBlacklistWithExpiryTimes: Map[String, Long] = scheduler.nodeBlacklistWithExpiryTimes() + val nodeBlacklist: Set[String] = scheduler.nodeBlacklist() // For locality preferences, ignore preferences for nodes that are blacklisted val filteredHostToLocalTaskCount = - hostToLocalTaskCount.filter { case (k, v) => !nodeBlacklistWithExpiryTimes.contains(k) } + hostToLocalTaskCount.filter { case (k, v) => !nodeBlacklist.contains(k) } RequestExecutors(requestedTotal, localityAwareTasks, filteredHostToLocalTaskCount, - nodeBlacklistWithExpiryTimes) + nodeBlacklist) } /** diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala index e9df854afb9fc..aeac68e6ed330 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala @@ -72,12 +72,12 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers clock.advance(BLACKLIST_TIMEOUT) // trigger synchronisation of blacklisted nodes with YARN - yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map()) + yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set()) verify(amClientMock).updateBlacklist(Collections.emptyList(), Arrays.asList("host")) } test("not handling the expiry of scheduler blacklisted nodes") { - yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map("host1" -> 100, "host2" -> 150)) + yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2")) verify(amClientMock) .updateBlacklist(Arrays.asList("host1", "host2"), Collections.emptyList()) @@ -85,12 +85,11 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers clock.advance(200L) // expired blacklisted nodes (simulating a resource request) - yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map("host1" -> 100, "host2" -> 150)) + yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2")) // no change is communicated to YARN regarding the blacklisting verify(amClientMock).updateBlacklist(Collections.emptyList(), Collections.emptyList()) } - test("combining scheduler and allocation blacklist") { (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach { _ => { @@ -106,20 +105,19 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers verify(amClientMock) .updateBlacklist(Arrays.asList("host1"), Collections.emptyList()) - yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map("host2" -> 100, "host3" -> 150)) + yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host2", "host3")) verify(amClientMock) .updateBlacklist(Arrays.asList("host2", "host3"), Collections.emptyList()) clock.advance(10L) - yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map("host3" -> 100, "host4" -> 200)) + yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host3", "host4")) verify(amClientMock) .updateBlacklist(Arrays.asList("host4"), Arrays.asList("host2")) } - test("if blacklist size limit is exceeded, prefer to blacklist nodes with the latest expiry") { - yarnBlacklistTracker.setSchedulerBlacklistedNodes( - Map("host1" -> 150, "host2" -> 110, "host3" -> 200)) + test("blacklist all available nodes") { + yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2", "host3")) verify(amClientMock) .updateBlacklist(Arrays.asList("host1", "host2", "host3"), Collections.emptyList()) @@ -136,25 +134,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers // the third failure on the host triggers the blacklisting yarnBlacklistTracker.handleResourceAllocationFailure(Some("host4")) - // the blacklistSizeLimit is 3 as numClusterNodes is 4 and BLACKLIST_SIZE_DEFAULT_WEIGHT is 0.75 - // this is why the 3 late expired nodes are chosen from the four nodes of - // "host1" -> 150, "host2" -> 110, "host3" -> 200, "host4" -> 160. - // So "host2" is removed from the previous state and "host4" is added - verify(amClientMock).updateBlacklist(Arrays.asList("host4"), Arrays.asList("host2")) - - clock.advance(10L) - (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach { - _ => { - yarnBlacklistTracker.handleResourceAllocationFailure(Some("host5")) - verify(amClientMock, never()) - .updateBlacklist(Arrays.asList("host5"), Collections.emptyList()) - } - } - - // the third failure on the host triggers the blacklisting - yarnBlacklistTracker.handleResourceAllocationFailure(Some("host5")) - // "host1" -> 150, "host2" -> 110, "host3" -> 200, "host4" -> 160, host4 -> 170 - verify(amClientMock).updateBlacklist(Arrays.asList("host5"), Arrays.asList("host1")) + verify(amClientMock).updateBlacklist(Arrays.asList("host4"), Collections.emptyList()) + assert(yarnBlacklistTracker.isAllNodeBlacklisted === true) } - } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 7c10c31ea39fd..3f783baed110d 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -200,7 +200,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumExecutorsRunning should be (0) handler.getPendingAllocate.size should be (4) - handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Map.empty) + handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty) handler.updateResourceRequests() handler.getPendingAllocate.size should be (3) @@ -211,7 +211,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) - handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Map.empty) + handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty) handler.updateResourceRequests() handler.getPendingAllocate.size should be (1) } @@ -222,7 +222,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumExecutorsRunning should be (0) handler.getPendingAllocate.size should be (4) - handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Map.empty) + handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty) handler.updateResourceRequests() handler.getPendingAllocate.size should be (3) @@ -232,7 +232,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumExecutorsRunning should be (2) - handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Map.empty) + handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty) handler.updateResourceRequests() handler.getPendingAllocate.size should be (0) handler.getNumExecutorsRunning should be (2) @@ -248,7 +248,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val container2 = createContainer("host2") handler.handleAllocatedContainers(Array(container1, container2)) - handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Map.empty) + handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty) handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) } val statuses = Seq(container1, container2).map { c => @@ -279,7 +279,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.killExecutor(executorToKill) handler.killExecutor(executorToKill) handler.getNumExecutorsRunning should be (1) - handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Map.empty) + handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty) handler.updateResourceRequests() handler.getPendingAllocate.size should be (1) } @@ -314,7 +314,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val container2 = createContainer("host2") handler.handleAllocatedContainers(Array(container1, container2)) - handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Map.empty) + handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set.empty) val statuses = Seq(container1, container2).map { c => ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1) @@ -333,17 +333,17 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter // to the blacklist. This makes sure we are sending the right updates. val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) val handler = createAllocator(4, mockAmClient) - handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Map("hostA" -> Long.MaxValue)) + handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set("hostA")) verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, Seq[String]().asJava) - val blacklistedNodes = Map( - "hostA" -> Long.MaxValue, - "hostB" -> Long.MaxValue + val blacklistedNodes = Set( + "hostA", + "hostB" ) handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), blacklistedNodes) verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, Seq[String]().asJava) - handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Map.empty) + handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set.empty) verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA", "hostB").asJava) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala index 220319243581c..7fac57ff68abc 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala @@ -38,8 +38,7 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc } val ser = new JavaSerializer(sc.conf).newInstance() for { - blacklist <- IndexedSeq(Map.empty[String, Long], - Map("a" -> Long.MaxValue, "b" -> Long.MaxValue, "c" -> Long.MaxValue)) + blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c")) numRequested <- 0 until 10 hostToLocalCount <- IndexedSeq( Map[String, Int](), @@ -47,11 +46,11 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc ) } { yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount) - when(sched.nodeBlacklistWithExpiryTimes()).thenReturn(blacklist) + when(sched.nodeBlacklist()).thenReturn(blacklist) val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested) assert(req.requestedTotal === numRequested) - assert(req.nodeBlacklistWithExpiryTimes === blacklist) - assert(req.hostToLocalTaskCount.keySet.intersect(blacklist.keySet).isEmpty) + assert(req.nodeBlacklist === blacklist) + assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty) // Serialize to make sure serialization doesn't throw an error ser.serialize(req) }