-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16630][YARN] Blacklist a node if executors won't launch on it #21068
Changes from 3 commits
fd1923e
e49bd0d
57086bb
c92a090
0ba8510
4df2311
17bbbee
5760d22
2a8ab8d
0e78b38
61f3d17
7fce4ee
aa52f6e
848d050
a462ce0
f71c7c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,7 +77,7 @@ private[scheduler] class BlacklistTracker ( | |
* An immutable copy of the set of nodes that are currently blacklisted. Kept in an | ||
* AtomicReference to make [[nodeBlacklist()]] thread-safe. | ||
*/ | ||
private val _nodeBlacklist = new AtomicReference[Set[String]](Set()) | ||
private val _nodeBlacklist = new AtomicReference[Map[String, Long]](Map()) | ||
/** | ||
* Time when the next blacklist will expire. Used as a | ||
* shortcut to avoid iterating over all entries in the blacklist when none will have expired. | ||
|
@@ -126,7 +126,7 @@ private[scheduler] class BlacklistTracker ( | |
nodeIdToBlacklistExpiryTime.remove(node) | ||
listenerBus.post(SparkListenerNodeUnblacklisted(now, node)) | ||
} | ||
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) | ||
updateNodeBlacklist() | ||
} | ||
updateNextExpiryTime() | ||
} | ||
|
@@ -196,7 +196,7 @@ private[scheduler] class BlacklistTracker ( | |
|
||
nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists) | ||
listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1)) | ||
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) | ||
updateNodeBlacklist() | ||
killExecutorsOnBlacklistedNode(host) | ||
updateNextExpiryTime() | ||
} | ||
|
@@ -216,6 +216,10 @@ private[scheduler] class BlacklistTracker ( | |
} | ||
} | ||
|
||
private def updateNodeBlacklist(): Unit = { | ||
_nodeBlacklist.set(collection.immutable.Map(nodeIdToBlacklistExpiryTime.toSeq: _*)) | ||
} | ||
|
||
def updateBlacklistForSuccessfulTaskSet( | ||
stageId: Int, | ||
stageAttemptId: Int, | ||
|
@@ -258,7 +262,7 @@ private[scheduler] class BlacklistTracker ( | |
s"executors blacklisted: ${blacklistedExecsOnNode}") | ||
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) | ||
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) | ||
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) | ||
updateNodeBlacklist() | ||
killExecutorsOnBlacklistedNode(node) | ||
} | ||
} | ||
|
@@ -273,7 +277,7 @@ private[scheduler] class BlacklistTracker ( | |
* Get the full set of nodes that are blacklisted. Unlike other methods in this class, this *IS* | ||
* thread-safe -- no lock required on a taskScheduler. | ||
*/ | ||
def nodeBlacklist(): Set[String] = { | ||
def nodeBlacklist(): Map[String, Long] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. update the comment here and on |
||
_nodeBlacklist.get() | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -651,8 +651,8 @@ private[spark] class TaskSchedulerImpl( | |
* Get a snapshot of the currently blacklisted nodes for the entire application. This is | ||
* thread-safe -- it can be called without a lock on the TaskScheduler. | ||
*/ | ||
def nodeBlacklist(): scala.collection.immutable.Set[String] = { | ||
blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Set()) | ||
def nodeBlacklistWithExpiryTimes(): scala.collection.immutable.Map[String, Long] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just I kinda find it odd when I see these types used this way, so unless there's a good reason... |
||
blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Map()) | ||
} | ||
|
||
// By default, rack is unknown | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -170,8 +170,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
if (executorDataMap.contains(executorId)) { | ||
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) | ||
context.reply(true) | ||
} else if (scheduler.nodeBlacklist != null && | ||
scheduler.nodeBlacklist.contains(hostname)) { | ||
} else if (scheduler.nodeBlacklistWithExpiryTimes != null && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Also calling that method twice causes unnecessary computation... |
||
scheduler.nodeBlacklistWithExpiryTimes.contains(hostname)) { | ||
// If the cluster manager gives us an executor on a blacklisted node (because it | ||
// already started allocating those resources before we informed it of our blacklist, | ||
// or if it ignored our blacklist), then we reject that executor immediately. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.deploy.yarn | ||
|
||
import scala.collection.mutable | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.util.{Clock, SystemClock} | ||
|
||
private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) extends Logging { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add scaladoc explaining what this does? |
||
|
||
private var clock: Clock = new SystemClock | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be a constructor argument. |
||
|
||
private val executorFailuresValidityInterval = | ||
sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) | ||
|
||
// Queue to store the timestamp of failed executors for each host | ||
private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]() | ||
|
||
private val failedExecutorsTimeStamps = new mutable.Queue[Long]() | ||
|
||
private def getNumFailuresWithinValidityInterval( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not really clear what a 'validity interval' is. I think it means that only failures that have happened recently are considered valid? I think it would be clearer to call this If you do the latter, and drop the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks then getRecentFailureCount will be the method name without the endTime argument. |
||
failedExecutorsTimeStampsForHost: mutable.Queue[Long], | ||
endTime: Long): Int = { | ||
while (executorFailuresValidityInterval > 0 | ||
&& failedExecutorsTimeStampsForHost.nonEmpty | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
&& failedExecutorsTimeStampsForHost.head < endTime - executorFailuresValidityInterval) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This relies on the fact the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is coming from YarnAllocator.scala#L175. As I see it the common solution to use Clock, SystemClock and ManualClock in Spark. And here the validity time is much higher then the diff NTP can apply. |
||
failedExecutorsTimeStampsForHost.dequeue() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's counter-intuitive that this
The last call can return something different from the first because all the failures that weren't within There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I will take your recommendation and drop endTime with renaming the method to getRecentFailureCount. |
||
} | ||
failedExecutorsTimeStampsForHost.size | ||
} | ||
|
||
/** | ||
* Use a different clock. This is mainly used for testing. | ||
*/ | ||
def setClock(newClock: Clock): Unit = { | ||
clock = newClock | ||
} | ||
|
||
def getNumExecutorsFailed: Int = synchronized { | ||
getNumFailuresWithinValidityInterval(failedExecutorsTimeStamps, clock.getTimeMillis()) | ||
} | ||
|
||
def registerFailureOnHost(hostname: String): Unit = synchronized { | ||
val timeMillis = clock.getTimeMillis() | ||
failedExecutorsTimeStamps.enqueue(timeMillis) | ||
val failedExecutorsOnHost = | ||
failedExecutorsTimeStampsPerHost.getOrElse(hostname, { | ||
val failureOnHost = mutable.Queue[Long]() | ||
failedExecutorsTimeStampsPerHost.put(hostname, failureOnHost) | ||
failureOnHost | ||
}) | ||
failedExecutorsOnHost.enqueue(timeMillis) } | ||
|
||
def registerExecutorFailure(): Unit = synchronized { | ||
val timeMillis = clock.getTimeMillis() | ||
failedExecutorsTimeStamps.enqueue(timeMillis) | ||
} | ||
|
||
def getNumExecutorFailuresOnHost(hostname: String): Int = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add braces in multi-line methods. Also, I know the YARN module is generally this way, but this seems like a good chance to start to use the more usual Spark convention of not naming methods with "get" (and also take the chance to make the name shorter). e.g., |
||
failedExecutorsTimeStampsPerHost.get(hostname).map { failedExecutorsOnHost => | ||
getNumFailuresWithinValidityInterval(failedExecutorsOnHost, clock.getTimeMillis()) | ||
}.getOrElse(0) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,7 +42,7 @@ import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} | |
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor | ||
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId | ||
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils | ||
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} | ||
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} | ||
|
||
/** | ||
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding | ||
|
@@ -102,18 +102,14 @@ private[yarn] class YarnAllocator( | |
private var executorIdCounter: Int = | ||
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) | ||
|
||
// Queue to store the timestamp of failed executors | ||
private val failedExecutorsTimeStamps = new Queue[Long]() | ||
private val failureWithinTimeIntervalTracker = new FailureWithinTimeIntervalTracker(sparkConf) | ||
|
||
private var clock: Clock = new SystemClock | ||
|
||
private val executorFailuresValidityInterval = | ||
sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) | ||
private val allocatorBlacklistTracker = | ||
new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureWithinTimeIntervalTracker) | ||
|
||
@volatile private var targetNumExecutors = | ||
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf) | ||
|
||
private var currentNodeBlacklist = Set.empty[String] | ||
|
||
// Executor loss reason requests that are pending - maps from executor ID for inquiry to a | ||
// list of requesters that should be responded to once we find out why the given executor | ||
|
@@ -149,7 +145,6 @@ private[yarn] class YarnAllocator( | |
|
||
private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION) | ||
|
||
|
||
// A map to store preferred hostname and possible task numbers running on it. | ||
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty | ||
|
||
|
@@ -160,26 +155,12 @@ private[yarn] class YarnAllocator( | |
private[yarn] val containerPlacementStrategy = | ||
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver) | ||
|
||
/** | ||
* Use a different clock for YarnAllocator. This is mainly used for testing. | ||
*/ | ||
def setClock(newClock: Clock): Unit = { | ||
clock = newClock | ||
} | ||
|
||
def getNumExecutorsRunning: Int = runningExecutors.size() | ||
|
||
def getNumExecutorsFailed: Int = synchronized { | ||
val endTime = clock.getTimeMillis() | ||
def getNumExecutorsFailed: Int = failureWithinTimeIntervalTracker.getNumExecutorsFailed | ||
|
||
while (executorFailuresValidityInterval > 0 | ||
&& failedExecutorsTimeStamps.nonEmpty | ||
&& failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) { | ||
failedExecutorsTimeStamps.dequeue() | ||
} | ||
|
||
failedExecutorsTimeStamps.size | ||
} | ||
def getFailureWithinTimeIntervalTracker: FailureWithinTimeIntervalTracker = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This name is kinda awkward. How about just (e.g. if you add any logic that is not time-based to that class, then its name becomes inaccurate.) Also instead of a getter you could just make the field public (it's already a |
||
failureWithinTimeIntervalTracker | ||
|
||
/** | ||
* A sequence of pending container requests that have not yet been fulfilled. | ||
|
@@ -204,35 +185,23 @@ private[yarn] class YarnAllocator( | |
* @param localityAwareTasks number of locality aware tasks to be used as container placement hint | ||
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as | ||
* container placement hint. | ||
* @param nodeBlacklist a set of blacklisted nodes, which is passed in to avoid allocating new | ||
* containers on them. It will be used to update the application master's | ||
* blacklist. | ||
* @param schedulerBlacklistedNodesWithExpiry blacklisted nodes with expiry times, which is passed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another name that is a mouthful... the previous name was fine. Or maybe |
||
* in to avoid allocating new containers on them. It will be used to update | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: align with start of comment on previous line |
||
* the application master's blacklist. | ||
* @return Whether the new requested total is different than the old value. | ||
*/ | ||
def requestTotalExecutorsWithPreferredLocalities( | ||
requestedTotal: Int, | ||
localityAwareTasks: Int, | ||
hostToLocalTaskCount: Map[String, Int], | ||
nodeBlacklist: Set[String]): Boolean = synchronized { | ||
requestedTotal: Int, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. double indent |
||
localityAwareTasks: Int, | ||
hostToLocalTaskCount: Map[String, Int], | ||
schedulerBlacklistedNodesWithExpiry: Map[String, Long]): Boolean = synchronized { | ||
this.numLocalityAwareTasks = localityAwareTasks | ||
this.hostToLocalTaskCounts = hostToLocalTaskCount | ||
|
||
if (requestedTotal != targetNumExecutors) { | ||
logInfo(s"Driver requested a total number of $requestedTotal executor(s).") | ||
targetNumExecutors = requestedTotal | ||
|
||
// Update blacklist infomation to YARN ResouceManager for this application, | ||
// in order to avoid allocating new Containers on the problematic nodes. | ||
val blacklistAdditions = nodeBlacklist -- currentNodeBlacklist | ||
val blacklistRemovals = currentNodeBlacklist -- nodeBlacklist | ||
if (blacklistAdditions.nonEmpty) { | ||
logInfo(s"adding nodes to YARN application master's blacklist: $blacklistAdditions") | ||
} | ||
if (blacklistRemovals.nonEmpty) { | ||
logInfo(s"removing nodes from YARN application master's blacklist: $blacklistRemovals") | ||
} | ||
amClient.updateBlacklist(blacklistAdditions.toList.asJava, blacklistRemovals.toList.asJava) | ||
currentNodeBlacklist = nodeBlacklist | ||
allocatorBlacklistTracker.setSchedulerBlacklistedNodes(schedulerBlacklistedNodesWithExpiry) | ||
true | ||
} else { | ||
false | ||
|
@@ -268,6 +237,7 @@ private[yarn] class YarnAllocator( | |
val allocateResponse = amClient.allocate(progressIndicator) | ||
|
||
val allocatedContainers = allocateResponse.getAllocatedContainers() | ||
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes) | ||
|
||
if (allocatedContainers.size > 0) { | ||
logDebug(("Allocated containers: %d. Current executor count: %d. " + | ||
|
@@ -602,8 +572,7 @@ private[yarn] class YarnAllocator( | |
completedContainer.getDiagnostics, | ||
PMEM_EXCEEDED_PATTERN)) | ||
case _ => | ||
// Enqueue the timestamp of failed executor | ||
failedExecutorsTimeStamps.enqueue(clock.getTimeMillis()) | ||
allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we actually totally sure this is an allocation failure? Its really any failure which isn't covered by the cases above, right? For example, it could be somebody logging in to the box and running If we really want to separate out allocation failures, we'd need the handling to be a little more complex, we'd have to keep track of executors that have registered with the driver. @tgravescs sound ok? |
||
(true, "Container marked as failed: " + containerId + onHostStr + | ||
". Exit status: " + completedContainer.getExitStatus + | ||
". Diagnostics: " + completedContainer.getDiagnostics) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function seems unnecessary to me, I don't see it adding any value vs doing inline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok