Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16630][YARN] Blacklist a node if executors won't launch on it #21068

Closed
wants to merge 16 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[scheduler] class BlacklistTracker (
* An immutable copy of the set of nodes that are currently blacklisted. Kept in an
* AtomicReference to make [[nodeBlacklist()]] thread-safe.
*/
private val _nodeBlacklist = new AtomicReference[Set[String]](Set())
private val _nodeBlacklist = new AtomicReference[Map[String, Long]](Map())
/**
* Time when the next blacklist will expire. Used as a
* shortcut to avoid iterating over all entries in the blacklist when none will have expired.
Expand Down Expand Up @@ -126,7 +126,7 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.remove(node)
listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
}
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
updateNodeBlacklist()
}
updateNextExpiryTime()
}
Expand Down Expand Up @@ -196,7 +196,7 @@ private[scheduler] class BlacklistTracker (

nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
updateNodeBlacklist()
killExecutorsOnBlacklistedNode(host)
updateNextExpiryTime()
}
Expand All @@ -216,6 +216,10 @@ private[scheduler] class BlacklistTracker (
}
}

private def updateNodeBlacklist(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function seems unnecessary to me, I don't see it adding any value vs doing inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

_nodeBlacklist.set(collection.immutable.Map(nodeIdToBlacklistExpiryTime.toSeq: _*))
}

def updateBlacklistForSuccessfulTaskSet(
stageId: Int,
stageAttemptId: Int,
Expand Down Expand Up @@ -258,7 +262,7 @@ private[scheduler] class BlacklistTracker (
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
updateNodeBlacklist()
killExecutorsOnBlacklistedNode(node)
}
}
Expand All @@ -273,7 +277,7 @@ private[scheduler] class BlacklistTracker (
* Get the full set of nodes that are blacklisted. Unlike other methods in this class, this *IS*
* thread-safe -- no lock required on a taskScheduler.
*/
def nodeBlacklist(): Set[String] = {
def nodeBlacklist(): Map[String, Long] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the comment here and on TaskSchedulerImpl.nodeBlacklist to explain the value.

_nodeBlacklist.get()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,8 @@ private[spark] class TaskSchedulerImpl(
* Get a snapshot of the currently blacklisted nodes for the entire application. This is
* thread-safe -- it can be called without a lock on the TaskScheduler.
*/
def nodeBlacklist(): scala.collection.immutable.Set[String] = {
blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Set())
def nodeBlacklistWithExpiryTimes(): scala.collection.immutable.Map[String, Long] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just Map[String, Long]?

I kinda find it odd when I see these types used this way, so unless there's a good reason...

blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Map())
}

// By default, rack is unknown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[spark] object CoarseGrainedClusterMessages {
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
nodeBlacklist: Set[String])
nodeBlacklistWithExpiryTimes: Map[String, Long])
extends CoarseGrainedClusterMessage

// Check if an executor was force-killed but for a reason unrelated to the running tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklist != null &&
scheduler.nodeBlacklist.contains(hostname)) {
} else if (scheduler.nodeBlacklistWithExpiryTimes != null &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodeBlacklistWithExpiryTimes is never null right?

Also calling that method twice causes unnecessary computation...

scheduler.nodeBlacklistWithExpiryTimes.contains(hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private class FakeSchedulerBackend(

protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty[String]))
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Map.empty))
}

protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ class StandaloneDynamicAllocationSuite

// Get "localhost" on a blacklist.
val taskScheduler = mock(classOf[TaskSchedulerImpl])
when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
when(taskScheduler.nodeBlacklistWithExpiryTimes())
.thenReturn(Map("blacklisted-host" -> Long.MaxValue))
when(taskScheduler.sc).thenReturn(sc)
sc.taskScheduler = taskScheduler

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
assert(blacklist.nodeBlacklist() === Set())
assert(blacklist.nodeBlacklist().keySet === Set())
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "1", 4))
Expand All @@ -191,7 +191,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
assert(blacklist.nodeBlacklist() === Set("hostA"))
assert(blacklist.nodeBlacklist().keySet === Set("hostA"))
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set("hostA"))
verify(listenerBusMock).post(SparkListenerNodeBlacklisted(0, "hostA", 2))
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
Expand All @@ -202,7 +202,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val timeout = blacklist.BLACKLIST_TIMEOUT_MILLIS + 1
clock.advance(timeout)
blacklist.applyBlacklistTimeout()
assert(blacklist.nodeBlacklist() === Set())
assert(blacklist.nodeBlacklist().keySet === Set())
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
verify(listenerBusMock).post(SparkListenerExecutorUnblacklisted(timeout, "2"))
Expand All @@ -215,7 +215,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures)
assert(blacklist.nodeBlacklist() === Set())
assert(blacklist.nodeBlacklist().keySet === Set())
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
Option(allocator) match {
case Some(a) =>
if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklistWithExpiryTimes)) {
resetAllocatorInterval()
}
context.reply(true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn

import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.util.{Clock, SystemClock}

private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add scaladoc explaining what this does?


private var clock: Clock = new SystemClock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a constructor argument.


private val executorFailuresValidityInterval =
sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)

// Queue to store the timestamp of failed executors for each host
private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]()

private val failedExecutorsTimeStamps = new mutable.Queue[Long]()

private def getNumFailuresWithinValidityInterval(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really clear what a 'validity interval' is. I think it means that only failures that have happened recently are considered valid? I think it would be clearer to call this getNumFailuresSince(), or getRecentFailureCount() or similar, and explicitly pass in the timestamp the caller wants to consider failures since.

If you do the latter, and drop the endTime argument, then you partly address the issue I raise below about how this mutates state, because getRecentFailureCount() suggests more clearly that it's expecting to take into account the current time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks then getRecentFailureCount will be the method name without the endTime argument.

failedExecutorsTimeStampsForHost: mutable.Queue[Long],
endTime: Long): Int = {
while (executorFailuresValidityInterval > 0
&& failedExecutorsTimeStampsForHost.nonEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: && goes in previous line, double indent following lines to separate them from body below.

&& failedExecutorsTimeStampsForHost.head < endTime - executorFailuresValidityInterval) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on the fact the clock is monotonic, but if it's a SystemClock it's based on System.currentTimeMillis() which is not monotonic and can time-travel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is coming from YarnAllocator.scala#L175.

As I see it the common solution to use Clock, SystemClock and ManualClock in Spark. And here the validity time is much higher then the diff NTP can apply.

failedExecutorsTimeStampsForHost.dequeue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's counter-intuitive that this get* method mutates state. If I called

getNumFailuresWithinValidityInterval(foo, 0)
getNumFailuresWithinValidityInterval(foo, 10)
getNumFailuresWithinValidityInterval(foo, 0)

The last call can return something different from the first because all the failures that weren't within 10 - executorFailuresValidityInterval will have been dropped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will take your recommendation and drop endTime with renaming the method to getRecentFailureCount.

}
failedExecutorsTimeStampsForHost.size
}

/**
* Use a different clock. This is mainly used for testing.
*/
def setClock(newClock: Clock): Unit = {
clock = newClock
}

def getNumExecutorsFailed: Int = synchronized {
getNumFailuresWithinValidityInterval(failedExecutorsTimeStamps, clock.getTimeMillis())
}

def registerFailureOnHost(hostname: String): Unit = synchronized {
val timeMillis = clock.getTimeMillis()
failedExecutorsTimeStamps.enqueue(timeMillis)
val failedExecutorsOnHost =
failedExecutorsTimeStampsPerHost.getOrElse(hostname, {
val failureOnHost = mutable.Queue[Long]()
failedExecutorsTimeStampsPerHost.put(hostname, failureOnHost)
failureOnHost
})
failedExecutorsOnHost.enqueue(timeMillis) }

def registerExecutorFailure(): Unit = synchronized {
val timeMillis = clock.getTimeMillis()
failedExecutorsTimeStamps.enqueue(timeMillis)
}

def getNumExecutorFailuresOnHost(hostname: String): Int =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add braces in multi-line methods.

Also, I know the YARN module is generally this way, but this seems like a good chance to start to use the more usual Spark convention of not naming methods with "get" (and also take the chance to make the name shorter). e.g., numFailuresOnHost or some variation of that.

failedExecutorsTimeStampsPerHost.get(hostname).map { failedExecutorsOnHost =>
getNumFailuresWithinValidityInterval(failedExecutorsOnHost, clock.getTimeMillis())
}.getOrElse(0)

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}

/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
Expand Down Expand Up @@ -102,18 +102,14 @@ private[yarn] class YarnAllocator(
private var executorIdCounter: Int =
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)

// Queue to store the timestamp of failed executors
private val failedExecutorsTimeStamps = new Queue[Long]()
private val failureWithinTimeIntervalTracker = new FailureWithinTimeIntervalTracker(sparkConf)

private var clock: Clock = new SystemClock

private val executorFailuresValidityInterval =
sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
private val allocatorBlacklistTracker =
new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureWithinTimeIntervalTracker)

@volatile private var targetNumExecutors =
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)

private var currentNodeBlacklist = Set.empty[String]

// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
// list of requesters that should be responded to once we find out why the given executor
Expand Down Expand Up @@ -149,7 +145,6 @@ private[yarn] class YarnAllocator(

private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)


// A map to store preferred hostname and possible task numbers running on it.
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty

Expand All @@ -160,26 +155,12 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)

/**
* Use a different clock for YarnAllocator. This is mainly used for testing.
*/
def setClock(newClock: Clock): Unit = {
clock = newClock
}

def getNumExecutorsRunning: Int = runningExecutors.size()

def getNumExecutorsFailed: Int = synchronized {
val endTime = clock.getTimeMillis()
def getNumExecutorsFailed: Int = failureWithinTimeIntervalTracker.getNumExecutorsFailed

while (executorFailuresValidityInterval > 0
&& failedExecutorsTimeStamps.nonEmpty
&& failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) {
failedExecutorsTimeStamps.dequeue()
}

failedExecutorsTimeStamps.size
}
def getFailureWithinTimeIntervalTracker: FailureWithinTimeIntervalTracker =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is kinda awkward. How about just getFailureTracker or something simple along those lines? That could be applied to the class name also.

(e.g. if you add any logic that is not time-based to that class, then its name becomes inaccurate.)

Also instead of a getter you could just make the field public (it's already a val).

failureWithinTimeIntervalTracker

/**
* A sequence of pending container requests that have not yet been fulfilled.
Expand All @@ -204,35 +185,23 @@ private[yarn] class YarnAllocator(
* @param localityAwareTasks number of locality aware tasks to be used as container placement hint
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
* container placement hint.
* @param nodeBlacklist a set of blacklisted nodes, which is passed in to avoid allocating new
* containers on them. It will be used to update the application master's
* blacklist.
* @param schedulerBlacklistedNodesWithExpiry blacklisted nodes with expiry times, which is passed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another name that is a mouthful... the previous name was fine. Or maybe schedulerBlacklist if you want to emphasize where it is supposed to come from.

* in to avoid allocating new containers on them. It will be used to update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: align with start of comment on previous line

* the application master's blacklist.
* @return Whether the new requested total is different than the old value.
*/
def requestTotalExecutorsWithPreferredLocalities(
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
nodeBlacklist: Set[String]): Boolean = synchronized {
requestedTotal: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double indent

localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
schedulerBlacklistedNodesWithExpiry: Map[String, Long]): Boolean = synchronized {
this.numLocalityAwareTasks = localityAwareTasks
this.hostToLocalTaskCounts = hostToLocalTaskCount

if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal

// Update blacklist infomation to YARN ResouceManager for this application,
// in order to avoid allocating new Containers on the problematic nodes.
val blacklistAdditions = nodeBlacklist -- currentNodeBlacklist
val blacklistRemovals = currentNodeBlacklist -- nodeBlacklist
if (blacklistAdditions.nonEmpty) {
logInfo(s"adding nodes to YARN application master's blacklist: $blacklistAdditions")
}
if (blacklistRemovals.nonEmpty) {
logInfo(s"removing nodes from YARN application master's blacklist: $blacklistRemovals")
}
amClient.updateBlacklist(blacklistAdditions.toList.asJava, blacklistRemovals.toList.asJava)
currentNodeBlacklist = nodeBlacklist
allocatorBlacklistTracker.setSchedulerBlacklistedNodes(schedulerBlacklistedNodesWithExpiry)
true
} else {
false
Expand Down Expand Up @@ -268,6 +237,7 @@ private[yarn] class YarnAllocator(
val allocateResponse = amClient.allocate(progressIndicator)

val allocatedContainers = allocateResponse.getAllocatedContainers()
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)

if (allocatedContainers.size > 0) {
logDebug(("Allocated containers: %d. Current executor count: %d. " +
Expand Down Expand Up @@ -602,8 +572,7 @@ private[yarn] class YarnAllocator(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
case _ =>
// Enqueue the timestamp of failed executor
failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we actually totally sure this is an allocation failure? Its really any failure which isn't covered by the cases above, right? For example, it could be somebody logging in to the box and running kill -9 on the executor (people do weird things), or sudden hardware failure, etc. I think the handling is still fine, but I'd add a comment here explaining a bit.

If we really want to separate out allocation failures, we'd need the handling to be a little more complex, we'd have to keep track of executors that have registered with the driver.

@tgravescs sound ok?

(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
Expand Down
Loading