diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index 30cf75d43ee09..980fbbe516b91 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -371,7 +371,7 @@ private[scheduler] class BlacklistTracker (
}
-private[scheduler] object BlacklistTracker extends Logging {
+private[spark] object BlacklistTracker extends Logging {
private val DEFAULT_TIMEOUT = "1h"
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index d8794e8e551aa..9b90e309d2e04 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -170,8 +170,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
- } else if (scheduler.nodeBlacklist != null &&
- scheduler.nodeBlacklist.contains(hostname)) {
+ } else if (scheduler.nodeBlacklist.contains(hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 88916488c0def..b705556e54b14 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark
import java.util.concurrent.{ExecutorService, TimeUnit}
-import scala.collection.Map
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._
@@ -73,6 +72,7 @@ class HeartbeatReceiverSuite
sc = spy(new SparkContext(conf))
scheduler = mock(classOf[TaskSchedulerImpl])
when(sc.taskScheduler).thenReturn(scheduler)
+ when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]())
when(scheduler.sc).thenReturn(sc)
heartbeatReceiverClock = new ManualClock
heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
@@ -241,7 +241,7 @@ class HeartbeatReceiverSuite
} === Some(true))
}
- private def getTrackedExecutors: Map[String, Long] = {
+ private def getTrackedExecutors: collection.Map[String, Long] = {
// We may receive undesired SparkListenerExecutorAdded from LocalSchedulerBackend,
// so exclude it from the map. See SPARK-10800.
heartbeatReceiver.invokePrivate(_executorLastSeen()).
@@ -272,7 +272,7 @@ private class FakeSchedulerBackend(
protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
- RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty[String]))
+ RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty))
}
protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4dbcbeafbbd9d..575da7205b529 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -411,6 +411,16 @@ To use a custom metrics.properties for the application master and executors, upd
name matches both the include and the exclude pattern, this file will be excluded eventually.
+
+ spark.yarn.blacklist.executor.launch.blacklisting.enabled |
+ false |
+
+ Flag to enable blacklisting of nodes having YARN resource allocation problems.
+ The error limit for blacklisting can be configured by
+ spark.blacklist.application.maxFailedExecutorsPerNode .
+ |
+
+
# Important notes
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index f4bd1ee9da6f7..b790c7cd27794 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -789,6 +789,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.nodeBlacklist).thenReturn(Set[String]())
when(taskScheduler.sc).thenReturn(sc)
externalShuffleClient = mock[MesosExternalShuffleClient]
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 3d6ee50b070a3..ecc576910db9e 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -515,6 +515,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
+ } else if (allocator.isAllNodeBlacklisted) {
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+ "Due to executor failures all available nodes are blacklisted")
} else {
logDebug("Sending progress")
allocator.allocateResources()
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index ebee3d431744d..fae054e0eea00 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -24,7 +24,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.control.NonFatal
import org.apache.hadoop.yarn.api.records._
@@ -66,7 +66,8 @@ private[yarn] class YarnAllocator(
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
- resolver: SparkRackResolver)
+ resolver: SparkRackResolver,
+ clock: Clock = new SystemClock)
extends Logging {
import YarnAllocator._
@@ -102,18 +103,14 @@ private[yarn] class YarnAllocator(
private var executorIdCounter: Int =
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
- // Queue to store the timestamp of failed executors
- private val failedExecutorsTimeStamps = new Queue[Long]()
+ private[spark] val failureTracker = new FailureTracker(sparkConf, clock)
- private var clock: Clock = new SystemClock
-
- private val executorFailuresValidityInterval =
- sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
+ private val allocatorBlacklistTracker =
+ new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker)
@volatile private var targetNumExecutors =
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
- private var currentNodeBlacklist = Set.empty[String]
// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
// list of requesters that should be responded to once we find out why the given executor
@@ -149,7 +146,6 @@ private[yarn] class YarnAllocator(
private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
-
// A map to store preferred hostname and possible task numbers running on it.
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
@@ -160,26 +156,11 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)
- /**
- * Use a different clock for YarnAllocator. This is mainly used for testing.
- */
- def setClock(newClock: Clock): Unit = {
- clock = newClock
- }
-
def getNumExecutorsRunning: Int = runningExecutors.size()
- def getNumExecutorsFailed: Int = synchronized {
- val endTime = clock.getTimeMillis()
+ def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors
- while (executorFailuresValidityInterval > 0
- && failedExecutorsTimeStamps.nonEmpty
- && failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) {
- failedExecutorsTimeStamps.dequeue()
- }
-
- failedExecutorsTimeStamps.size
- }
+ def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted
/**
* A sequence of pending container requests that have not yet been fulfilled.
@@ -204,9 +185,8 @@ private[yarn] class YarnAllocator(
* @param localityAwareTasks number of locality aware tasks to be used as container placement hint
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
* container placement hint.
- * @param nodeBlacklist a set of blacklisted nodes, which is passed in to avoid allocating new
- * containers on them. It will be used to update the application master's
- * blacklist.
+ * @param nodeBlacklist blacklisted nodes, which is passed in to avoid allocating new containers
+ * on them. It will be used to update the application master's blacklist.
* @return Whether the new requested total is different than the old value.
*/
def requestTotalExecutorsWithPreferredLocalities(
@@ -220,19 +200,7 @@ private[yarn] class YarnAllocator(
if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal
-
- // Update blacklist infomation to YARN ResouceManager for this application,
- // in order to avoid allocating new Containers on the problematic nodes.
- val blacklistAdditions = nodeBlacklist -- currentNodeBlacklist
- val blacklistRemovals = currentNodeBlacklist -- nodeBlacklist
- if (blacklistAdditions.nonEmpty) {
- logInfo(s"adding nodes to YARN application master's blacklist: $blacklistAdditions")
- }
- if (blacklistRemovals.nonEmpty) {
- logInfo(s"removing nodes from YARN application master's blacklist: $blacklistRemovals")
- }
- amClient.updateBlacklist(blacklistAdditions.toList.asJava, blacklistRemovals.toList.asJava)
- currentNodeBlacklist = nodeBlacklist
+ allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
true
} else {
false
@@ -268,6 +236,7 @@ private[yarn] class YarnAllocator(
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
+ allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
if (allocatedContainers.size > 0) {
logDebug(("Allocated containers: %d. Current executor count: %d. " +
@@ -602,8 +571,9 @@ private[yarn] class YarnAllocator(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
case _ =>
- // Enqueue the timestamp of failed executor
- failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
+ // all the failures which not covered above, like:
+ // disk failure, kill by app master or resource manager, ...
+ allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
new file mode 100644
index 0000000000000..1b48a0ee7ad32
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.scheduler.BlacklistTracker
+import org.apache.spark.util.{Clock, SystemClock, Utils}
+
+/**
+ * YarnAllocatorBlacklistTracker is responsible for tracking the blacklisted nodes
+ * and synchronizing the node list to YARN.
+ *
+ * Blacklisted nodes are coming from two different sources:
+ *
+ *
+ * - from the scheduler as task level blacklisted nodes
+ *
- from this class (tracked here) as YARN resource allocation problems
+ *
+ *
+ * The reason to realize this logic here (and not in the driver) is to avoid possible delays
+ * between synchronizing the blacklisted nodes with YARN and resource allocations.
+ */
+private[spark] class YarnAllocatorBlacklistTracker(
+ sparkConf: SparkConf,
+ amClient: AMRMClient[ContainerRequest],
+ failureTracker: FailureTracker)
+ extends Logging {
+
+ private val blacklistTimeoutMillis = BlacklistTracker.getBlacklistTimeout(sparkConf)
+
+ private val launchBlacklistEnabled = sparkConf.get(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED)
+
+ private val maxFailuresPerHost = sparkConf.get(MAX_FAILED_EXEC_PER_NODE)
+
+ private val allocatorBlacklist = new HashMap[String, Long]()
+
+ private var currentBlacklistedYarnNodes = Set.empty[String]
+
+ private var schedulerBlacklist = Set.empty[String]
+
+ private var numClusterNodes = Int.MaxValue
+
+ def setNumClusterNodes(numClusterNodes: Int): Unit = {
+ this.numClusterNodes = numClusterNodes
+ }
+
+ def handleResourceAllocationFailure(hostOpt: Option[String]): Unit = {
+ hostOpt match {
+ case Some(hostname) if launchBlacklistEnabled =>
+ // failures on an already blacklisted nodes are not even tracked.
+ // otherwise, such failures could shutdown the application
+ // as resource requests are asynchronous
+ // and a late failure response could exceed MAX_EXECUTOR_FAILURES
+ if (!schedulerBlacklist.contains(hostname) &&
+ !allocatorBlacklist.contains(hostname)) {
+ failureTracker.registerFailureOnHost(hostname)
+ updateAllocationBlacklistedNodes(hostname)
+ }
+ case _ =>
+ failureTracker.registerExecutorFailure()
+ }
+ }
+
+ private def updateAllocationBlacklistedNodes(hostname: String): Unit = {
+ val failuresOnHost = failureTracker.numFailuresOnHost(hostname)
+ if (failuresOnHost > maxFailuresPerHost) {
+ logInfo(s"blacklisting $hostname as YARN allocation failed $failuresOnHost times")
+ allocatorBlacklist.put(
+ hostname,
+ failureTracker.clock.getTimeMillis() + blacklistTimeoutMillis)
+ refreshBlacklistedNodes()
+ }
+ }
+
+ def setSchedulerBlacklistedNodes(schedulerBlacklistedNodesWithExpiry: Set[String]): Unit = {
+ this.schedulerBlacklist = schedulerBlacklistedNodesWithExpiry
+ refreshBlacklistedNodes()
+ }
+
+ def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= numClusterNodes
+
+ private def refreshBlacklistedNodes(): Unit = {
+ removeExpiredYarnBlacklistedNodes()
+ val allBlacklistedNodes = schedulerBlacklist ++ allocatorBlacklist.keySet
+ synchronizeBlacklistedNodeWithYarn(allBlacklistedNodes)
+ }
+
+ private def synchronizeBlacklistedNodeWithYarn(nodesToBlacklist: Set[String]): Unit = {
+ // Update blacklist information to YARN ResourceManager for this application,
+ // in order to avoid allocating new Containers on the problematic nodes.
+ val additions = (nodesToBlacklist -- currentBlacklistedYarnNodes).toList.sorted
+ val removals = (currentBlacklistedYarnNodes -- nodesToBlacklist).toList.sorted
+ if (additions.nonEmpty) {
+ logInfo(s"adding nodes to YARN application master's blacklist: $additions")
+ }
+ if (removals.nonEmpty) {
+ logInfo(s"removing nodes from YARN application master's blacklist: $removals")
+ }
+ amClient.updateBlacklist(additions.asJava, removals.asJava)
+ currentBlacklistedYarnNodes = nodesToBlacklist
+ }
+
+ private def removeExpiredYarnBlacklistedNodes(): Unit = {
+ val now = failureTracker.clock.getTimeMillis()
+ allocatorBlacklist.retain { (_, expiryTime) => expiryTime > now }
+ }
+}
+
+/**
+ * FailureTracker is responsible for tracking executor failures both for each host separately
+ * and for all hosts altogether.
+ */
+private[spark] class FailureTracker(
+ sparkConf: SparkConf,
+ val clock: Clock = new SystemClock) extends Logging {
+
+ private val executorFailuresValidityInterval =
+ sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
+
+ // Queue to store the timestamp of failed executors for each host
+ private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]()
+
+ private val failedExecutorsTimeStamps = new mutable.Queue[Long]()
+
+ private def updateAndCountFailures(failedExecutorsWithTimeStamps: mutable.Queue[Long]): Int = {
+ val endTime = clock.getTimeMillis()
+ while (executorFailuresValidityInterval > 0 &&
+ failedExecutorsWithTimeStamps.nonEmpty &&
+ failedExecutorsWithTimeStamps.head < endTime - executorFailuresValidityInterval) {
+ failedExecutorsWithTimeStamps.dequeue()
+ }
+ failedExecutorsWithTimeStamps.size
+ }
+
+ def numFailedExecutors: Int = synchronized {
+ updateAndCountFailures(failedExecutorsTimeStamps)
+ }
+
+ def registerFailureOnHost(hostname: String): Unit = synchronized {
+ val timeMillis = clock.getTimeMillis()
+ failedExecutorsTimeStamps.enqueue(timeMillis)
+ val failedExecutorsOnHost =
+ failedExecutorsTimeStampsPerHost.getOrElse(hostname, {
+ val failureOnHost = mutable.Queue[Long]()
+ failedExecutorsTimeStampsPerHost.put(hostname, failureOnHost)
+ failureOnHost
+ })
+ failedExecutorsOnHost.enqueue(timeMillis)
+ }
+
+ def registerExecutorFailure(): Unit = synchronized {
+ val timeMillis = clock.getTimeMillis()
+ failedExecutorsTimeStamps.enqueue(timeMillis)
+ }
+
+ def numFailuresOnHost(hostname: String): Int = {
+ failedExecutorsTimeStampsPerHost.get(hostname).map { failedExecutorsOnHost =>
+ updateAndCountFailures(failedExecutorsOnHost)
+ }.getOrElse(0)
+ }
+
+}
+
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 1a99b3bd57672..129084a86597a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -328,4 +328,10 @@ package object config {
CACHED_FILES_TYPES,
CACHED_CONF_ARCHIVE)
+ /* YARN allocator-level blacklisting related config entries. */
+ private[spark] val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED =
+ ConfigBuilder("spark.yarn.blacklist.executor.launch.blacklisting.enabled")
+ .booleanConf
+ .createWithDefault(false)
+
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/FailureTrackerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/FailureTrackerSuite.scala
new file mode 100644
index 0000000000000..4f77b9c99dd25
--- /dev/null
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/FailureTrackerSuite.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.util.ManualClock
+
+class FailureTrackerSuite extends SparkFunSuite with Matchers {
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ }
+
+ test("failures expire if validity interval is set") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS, 100L)
+
+ val clock = new ManualClock()
+ val failureTracker = new FailureTracker(sparkConf, clock)
+
+ clock.setTime(0)
+ failureTracker.registerFailureOnHost("host1")
+ failureTracker.numFailuresOnHost("host1") should be (1)
+ failureTracker.numFailedExecutors should be (1)
+
+ clock.setTime(10)
+ failureTracker.registerFailureOnHost("host2")
+ failureTracker.numFailuresOnHost("host2") should be (1)
+ failureTracker.numFailedExecutors should be (2)
+
+ clock.setTime(20)
+ failureTracker.registerFailureOnHost("host1")
+ failureTracker.numFailuresOnHost("host1") should be (2)
+ failureTracker.numFailedExecutors should be (3)
+
+ clock.setTime(30)
+ failureTracker.registerFailureOnHost("host2")
+ failureTracker.numFailuresOnHost("host2") should be (2)
+ failureTracker.numFailedExecutors should be (4)
+
+ clock.setTime(101)
+ failureTracker.numFailuresOnHost("host1") should be (1)
+ failureTracker.numFailedExecutors should be (3)
+
+ clock.setTime(231)
+ failureTracker.numFailuresOnHost("host1") should be (0)
+ failureTracker.numFailuresOnHost("host2") should be (0)
+ failureTracker.numFailedExecutors should be (0)
+ }
+
+
+ test("failures never expire if validity interval is not set (-1)") {
+ val sparkConf = new SparkConf()
+
+ val clock = new ManualClock()
+ val failureTracker = new FailureTracker(sparkConf, clock)
+
+ clock.setTime(0)
+ failureTracker.registerFailureOnHost("host1")
+ failureTracker.numFailuresOnHost("host1") should be (1)
+ failureTracker.numFailedExecutors should be (1)
+
+ clock.setTime(10)
+ failureTracker.registerFailureOnHost("host2")
+ failureTracker.numFailuresOnHost("host2") should be (1)
+ failureTracker.numFailedExecutors should be (2)
+
+ clock.setTime(20)
+ failureTracker.registerFailureOnHost("host1")
+ failureTracker.numFailuresOnHost("host1") should be (2)
+ failureTracker.numFailedExecutors should be (3)
+
+ clock.setTime(30)
+ failureTracker.registerFailureOnHost("host2")
+ failureTracker.numFailuresOnHost("host2") should be (2)
+ failureTracker.numFailedExecutors should be (4)
+
+ clock.setTime(1000)
+ failureTracker.numFailuresOnHost("host1") should be (2)
+ failureTracker.numFailuresOnHost("host2") should be (2)
+ failureTracker.numFailedExecutors should be (4)
+ }
+
+}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
new file mode 100644
index 0000000000000..aeac68e6ed330
--- /dev/null
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.util.Arrays
+import java.util.Collections
+
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config.YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED
+import org.apache.spark.internal.config.{BLACKLIST_TIMEOUT_CONF, MAX_FAILED_EXEC_PER_NODE}
+import org.apache.spark.util.ManualClock
+
+class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
+ with BeforeAndAfterEach {
+
+ val BLACKLIST_TIMEOUT = 100L
+ val MAX_FAILED_EXEC_PER_NODE_VALUE = 2
+
+ var amClientMock: AMRMClient[ContainerRequest] = _
+ var yarnBlacklistTracker: YarnAllocatorBlacklistTracker = _
+ var failureTracker: FailureTracker = _
+ var clock: ManualClock = _
+
+ override def beforeEach(): Unit = {
+ val sparkConf = new SparkConf()
+ sparkConf.set(BLACKLIST_TIMEOUT_CONF, BLACKLIST_TIMEOUT)
+ sparkConf.set(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED, true)
+ sparkConf.set(MAX_FAILED_EXEC_PER_NODE, MAX_FAILED_EXEC_PER_NODE_VALUE)
+ clock = new ManualClock()
+
+ amClientMock = mock(classOf[AMRMClient[ContainerRequest]])
+ failureTracker = new FailureTracker(sparkConf, clock)
+ yarnBlacklistTracker =
+ new YarnAllocatorBlacklistTracker(sparkConf, amClientMock, failureTracker)
+ yarnBlacklistTracker.setNumClusterNodes(4)
+ super.beforeEach()
+ }
+
+ test("expiring its own blacklisted nodes") {
+ (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
+ _ => {
+ yarnBlacklistTracker.handleResourceAllocationFailure(Some("host"))
+ // host should not be blacklisted at these failures as MAX_FAILED_EXEC_PER_NODE is 2
+ verify(amClientMock, never())
+ .updateBlacklist(Arrays.asList("host"), Collections.emptyList())
+ }
+ }
+
+ yarnBlacklistTracker.handleResourceAllocationFailure(Some("host"))
+ // the third failure on the host triggers the blacklisting
+ verify(amClientMock).updateBlacklist(Arrays.asList("host"), Collections.emptyList())
+
+ clock.advance(BLACKLIST_TIMEOUT)
+
+ // trigger synchronisation of blacklisted nodes with YARN
+ yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set())
+ verify(amClientMock).updateBlacklist(Collections.emptyList(), Arrays.asList("host"))
+ }
+
+ test("not handling the expiry of scheduler blacklisted nodes") {
+ yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
+ verify(amClientMock)
+ .updateBlacklist(Arrays.asList("host1", "host2"), Collections.emptyList())
+
+ // advance timer more then host1, host2 expiry time
+ clock.advance(200L)
+
+ // expired blacklisted nodes (simulating a resource request)
+ yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
+ // no change is communicated to YARN regarding the blacklisting
+ verify(amClientMock).updateBlacklist(Collections.emptyList(), Collections.emptyList())
+ }
+
+ test("combining scheduler and allocation blacklist") {
+ (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
+ _ => {
+ yarnBlacklistTracker.handleResourceAllocationFailure(Some("host1"))
+ // host1 should not be blacklisted at these failures as MAX_FAILED_EXEC_PER_NODE is 2
+ verify(amClientMock, never())
+ .updateBlacklist(Arrays.asList("host1"), Collections.emptyList())
+ }
+ }
+
+ // as this is the third failure on host1 the node will be blacklisted
+ yarnBlacklistTracker.handleResourceAllocationFailure(Some("host1"))
+ verify(amClientMock)
+ .updateBlacklist(Arrays.asList("host1"), Collections.emptyList())
+
+ yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host2", "host3"))
+ verify(amClientMock)
+ .updateBlacklist(Arrays.asList("host2", "host3"), Collections.emptyList())
+
+ clock.advance(10L)
+
+ yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host3", "host4"))
+ verify(amClientMock)
+ .updateBlacklist(Arrays.asList("host4"), Arrays.asList("host2"))
+ }
+
+ test("blacklist all available nodes") {
+ yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2", "host3"))
+ verify(amClientMock)
+ .updateBlacklist(Arrays.asList("host1", "host2", "host3"), Collections.emptyList())
+
+ clock.advance(60L)
+ (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
+ _ => {
+ yarnBlacklistTracker.handleResourceAllocationFailure(Some("host4"))
+ // host4 should not be blacklisted at these failures as MAX_FAILED_EXEC_PER_NODE is 2
+ verify(amClientMock, never())
+ .updateBlacklist(Arrays.asList("host4"), Collections.emptyList())
+ }
+ }
+
+ // the third failure on the host triggers the blacklisting
+ yarnBlacklistTracker.handleResourceAllocationFailure(Some("host4"))
+
+ verify(amClientMock).updateBlacklist(Arrays.asList("host4"), Collections.emptyList())
+ assert(yarnBlacklistTracker.isAllNodeBlacklisted === true)
+ }
+}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 525abb6f2b350..3f783baed110d 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -59,6 +59,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
var rmClient: AMRMClient[ContainerRequest] = _
+ var clock: ManualClock = _
+
var containerNum = 0
override def beforeEach() {
@@ -66,6 +68,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
rmClient = AMRMClient.createAMRMClient()
rmClient.init(conf)
rmClient.start()
+ clock = new ManualClock()
}
override def afterEach() {
@@ -101,7 +104,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
appAttemptId,
new SecurityManager(sparkConf),
Map(),
- new MockResolver())
+ new MockResolver(),
+ clock)
}
def createContainer(host: String): Container = {
@@ -332,10 +336,14 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set("hostA"))
verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, Seq[String]().asJava)
- handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set("hostA", "hostB"))
+ val blacklistedNodes = Set(
+ "hostA",
+ "hostB"
+ )
+ handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), blacklistedNodes)
verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, Seq[String]().asJava)
- handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set())
+ handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set.empty)
verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA", "hostB").asJava)
}
@@ -353,8 +361,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
test("window based failure executor counting") {
sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
val handler = createAllocator(4)
- val clock = new ManualClock(0L)
- handler.setClock(clock)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)