-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16630][YARN] Blacklist a node if executors won't launch on it #21068
Conversation
Jenkins, test this please |
Jenkins, add to whitelist |
Test build #89347 has finished for PR 21068 at commit
|
Test build #89348 has finished for PR 21068 at commit
|
Test build #89350 has finished for PR 21068 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just really minor comments from a first read, need to spend more time understanding it all better
// Queue to store the timestamp of failed executors for each host | ||
private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]() | ||
|
||
private val sumFailedExecutorsTimeStamps = new mutable.Queue[Long]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this called "sum"? I think the old name failedExecutorTimestamps
is more appropriate, same for the other places you added "sum"
BLACKLIST_SIZE_LIMIT.getOrElse((numClusterNodes * BLACKLIST_SIZE_DEFAULT_WEIGHT).toInt) | ||
val nodesToBlacklist = | ||
if (schedulerBlacklistedNodesWithExpiry.size + | ||
allocationBlacklistedNodesWithExpiry.size > limit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double-indent the continued continuation of the if
condition. (we dont' do this everywhere but we should, I find it helps)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its worth considering if we can make these changes less yarn-specific. Really we're only getting a bit of info from the cluster manager:
- the container failed during allocation
- how many nodes are on the cluster
and we only need to have the combined set of blacklisted nodes available to the cluster manager. The rest of the logic could live within BlacklistTracker (or some similar helper) which doesn't need to know about the cluster manager at all.
Other than just renaming, the significant change that would mean is that all the logic in YarnAllocatorBlacklistTracker would need to move to ther driver, instead of on the AM, so it would change the messages somewhat. In particular I think you'd need to change the ExecutorExited
message to include whether it was a failure to even allocate the container.
This way it would be easier to add this for mesos (there are already mesos changes that are sort of waiting on this) and kubernetes
@tgravescs thoughts?
Test build #89355 has finished for PR 21068 at commit
|
Test build #89373 has finished for PR 21068 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just took a quick look - would work on Imran's advice first and then see if any of my comments are still valid.
} | ||
|
||
private def updateAllocationBlacklistedNodes(hostname: String): Unit = { | ||
if (IS_YARN_ALLOCATION_BLACKLIST_ENABLED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider just:
if (!IS_YARN_ALLOCATION_BLACKLIST_ENABLED) return;
to save a level of indentation below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I know using return in Scala is mostly discouraged. Anyway here we have only two levels of indentations so I would keep these if conditions as it is.
if (IS_YARN_ALLOCATION_BLACKLIST_ENABLED) { | ||
val failuresOnHost = failureWithinTimeIntervalTracker.getNumExecutorFailuresOnHost(hostname) | ||
if (failuresOnHost > BLACKLIST_MAX_FAILED_EXEC_PER_NODE) { | ||
logInfo("blacklisting host as YARN allocation failed: %s".format(hostname)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log msg could include the number of failures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, I will add it
|
||
private var currentBlacklistedYarnNodes = Set.empty[String] | ||
|
||
private var schedulerBlacklistedNodesWithExpiry = Map.empty[String, Long] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to keep a separate data structure for the scheduler and allocator blacklisted nodes? Instead, could you add the scheduler ones into a shared map when setSchedulerBlacklistedNodes
is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to store them separately as we there is these two sources of backlisted nodes and they are updated separately via the two setters where not the diffs but the complete state of the backlisted sets are coming (another reason is only allocator backlisted nodes expiry handled by YarnAllocatorBlacklistTracker).
while (executorFailuresValidityInterval > 0 | ||
&& failedExecutorsTimeStampsForHost.nonEmpty | ||
&& failedExecutorsTimeStampsForHost.head < endTime - executorFailuresValidityInterval) { | ||
failedExecutorsTimeStampsForHost.dequeue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's counter-intuitive that this get*
method mutates state. If I called
getNumFailuresWithinValidityInterval(foo, 0)
getNumFailuresWithinValidityInterval(foo, 10)
getNumFailuresWithinValidityInterval(foo, 0)
The last call can return something different from the first because all the failures that weren't within 10 - executorFailuresValidityInterval
will have been dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will take your recommendation and drop endTime with renaming the method to getRecentFailureCount.
endTime: Long): Int = { | ||
while (executorFailuresValidityInterval > 0 | ||
&& failedExecutorsTimeStampsForHost.nonEmpty | ||
&& failedExecutorsTimeStampsForHost.head < endTime - executorFailuresValidityInterval) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This relies on the fact the clock
is monotonic, but if it's a SystemClock
it's based on System.currentTimeMillis()
which is not monotonic and can time-travel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is coming from YarnAllocator.scala#L175.
As I see it the common solution to use Clock, SystemClock and ManualClock in Spark. And here the validity time is much higher then the diff NTP can apply.
|
||
private val failedExecutorsTimeStamps = new mutable.Queue[Long]() | ||
|
||
private def getNumFailuresWithinValidityInterval( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really clear what a 'validity interval' is. I think it means that only failures that have happened recently are considered valid? I think it would be clearer to call this getNumFailuresSince()
, or getRecentFailureCount()
or similar, and explicitly pass in the timestamp the caller wants to consider failures since.
If you do the latter, and drop the endTime
argument, then you partly address the issue I raise below about how this mutates state, because getRecentFailureCount()
suggests more clearly that it's expecting to take into account the current time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks then getRecentFailureCount will be the method name without the endTime argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can move it to be common I think that would be good.
@squito do you know if mesos and/or kubernetes can provide this same information?
@@ -216,6 +216,10 @@ private[scheduler] class BlacklistTracker ( | |||
} | |||
} | |||
|
|||
private def updateNodeBlacklist(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function seems unnecessary to me, I don't see it adding any value vs doing inline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
.booleanConf | ||
.createOptional | ||
|
||
private[spark] val YARN_BLACKLIST_SIZE_LIMIT = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we want both this and the spark.yarn.blacklist.size.default.weight?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove it
@@ -328,4 +328,26 @@ package object config { | |||
CACHED_FILES_TYPES, | |||
CACHED_CONF_ARCHIVE) | |||
|
|||
/* YARN allocator-level blacklisting related config entries. */ | |||
private[spark] val YARN_ALLOCATION_BLACKLIST_ENABLED = | |||
ConfigBuilder("spark.yarn.allocation.blacklist.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say either just call it spark.yarn.blacklist.enabled or we make it more specific that this is executor launch failure blacklisting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First I named it "spark.yarn.blacklist.enabled" but then I was wondering whether a user will confuse it with YARN backlisting so I have added the "allocation" part. So I would go for the second option: "spark.yarn.executor.launch.blacklist.enabled".
|
||
private[spark] val YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT = | ||
ConfigBuilder("spark.yarn.blacklist.size.default.weight") | ||
.doc("If blacklist size limit is not specified then the default limit will be the number of " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps rename something like spark.yarn.blacklist.maxNodeBlacklistRatio . (note we are talking about using Ratio in another config here: #19881)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
actually the only other thing I need to make sure is there aren't any delays if we now send the information from yarn allocator back to scheduler and then I assume it would need to get it back again from scheduler. During that the yarn allocator could be calling allocate() and updating things. So we need to make sure it gets the most up to date blacklist. also I need to double check but the blacklist information isn't being sent to the yarn allocator when dynamic allocation is off right? We would want that to happen. |
yeah both good points. actually, don't we want to update the general node blacklist on the yarn allocator even when dynamic allocation is off? I don't think it gets updated at all unless dynamic allocation is on, it seems all the updates originate in
I don't know about kubernetes at all. Mesos does provide info when a container fails. I don't think it lets you know the total cluster size, but that should be optional. Btw, node count is never going to be totally sufficient, as the remaining nodes might not actually be able to run your executors (smaller hardware, always taken up by higher priority applications, other constraints in a framework like mesos), its always going to be best effort. @attilapiros and I discussed this briefly yesterday, an alternative to moving everything into the BlacklistTracker on the driver is to just have some abstract base class, which is changed slightly for each cluster manager. Then you could keep the flow like it is here, with the extra blacklisting living in YarnAllocator still. |
Yes we can create an abstract class from We just have to make our mind where to go from here. Any help and suggestions are welcomed for the decision. |
I think Tom makes a good case for why this should live in the YarnAllocator as you have it. I also don't think you need to worry about creating an abstract class yet, that refactoring can be done when another cluster manager tries to share some code ... it would just be helpful to keep that use in mind. also I filed https://issues.apache.org/jira/browse/SPARK-24016 for updating the task-based node blacklist even with static allocation |
thanks for filing that jira @squito, I agree we should have blacklisting work with dynamic allocation disabled as well. (A bit of a tangent from this jira) I'm actually wondering now about the scheduler blacklisting and whether it should have a max blacklisted Ratio as well. I don't remember if we discussed this previously. For this, I'm fine either way, if there are people interested in doing the mesos/kubernetes stuff now we could certainly coordinate with them to see if there is something common we could do now. I haven't had time to keep up with those jira to know though. Otherwise this isn't public facing so we can do that when they decide to implement it. |
Test build #89514 has finished for PR 21068 at commit
|
@tgravescs on the blacklist ratio for task-based blacklisting -- there is nothing, but there are some related jiras: SPARK-22148 & SPARK-15815 to be honest I have doubts about the utility of the ratio ... if you really want to make sure blacklisting doesn't lead to starvation, you've got to have some other mechanism, as you could easily have the remaining nodes be occupied or have insufficient resources. Kubernetes doesn't do anything with the node blacklisting currently: SPARK-23485 Mesos already has a notion of blacklisting nodes for failing to allocate containers, but its currently at odds with the task-based blacklist. #20640 is somewhat stalled because blacklisting based on allocation failures is missing in a general sense. In any case, I still think we shouldn't make the code more complex for something other clusters managers might use in the future, and that the current overall organization is fine. |
ok sounds fine to me, so we should review as is then |
A couple more high-level thoughts:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a first pass and mostly pointed out stylistic stuff... I need a second pass to take a closer look at the functionality. Didn't see any red flags though.
@@ -126,7 +126,7 @@ private[scheduler] class BlacklistTracker ( | |||
nodeIdToBlacklistExpiryTime.remove(node) | |||
listenerBus.post(SparkListenerNodeUnblacklisted(now, node)) | |||
} | |||
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) | |||
_nodeBlacklist.set(collection.immutable.Map(nodeIdToBlacklistExpiryTime.toSeq: _*)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this the same as calling nodeIdToBlacklistExpiryTime.toMap
? (That returns an immutable map.)
At the very least, the collectiom.immutable.
part looks unnecessary. Same thing happens below.
@@ -651,8 +651,8 @@ private[spark] class TaskSchedulerImpl( | |||
* Get a snapshot of the currently blacklisted nodes for the entire application. This is | |||
* thread-safe -- it can be called without a lock on the TaskScheduler. | |||
*/ | |||
def nodeBlacklist(): scala.collection.immutable.Set[String] = { | |||
blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Set()) | |||
def nodeBlacklistWithExpiryTimes(): scala.collection.immutable.Map[String, Long] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just Map[String, Long]
?
I kinda find it odd when I see these types used this way, so unless there's a good reason...
@@ -170,8 +170,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |||
if (executorDataMap.contains(executorId)) { | |||
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) | |||
context.reply(true) | |||
} else if (scheduler.nodeBlacklist != null && | |||
scheduler.nodeBlacklist.contains(hostname)) { | |||
} else if (scheduler.nodeBlacklistWithExpiryTimes != null && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nodeBlacklistWithExpiryTimes
is never null right?
Also calling that method twice causes unnecessary computation...
import org.apache.spark.internal.Logging | ||
import org.apache.spark.util.{Clock, SystemClock} | ||
|
||
private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add scaladoc explaining what this does?
|
||
private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) extends Logging { | ||
|
||
private var clock: Clock = new SystemClock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a constructor argument.
private def synchronizeBlacklistedNodeWithYarn(nodesToBlacklist: Set[String]): Unit = { | ||
// Update blacklist information to YARN ResourceManager for this application, | ||
// in order to avoid allocating new Containers on the problematic nodes. | ||
val blacklistAdditions = (nodesToBlacklist -- currentBlacklistedYarnNodes).toList.sorted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
additions
, removals
are just as good names for these variables.
|
||
private def removeExpiredYarnBlacklistedNodes() = { | ||
val now = clock.getTimeMillis() | ||
allocationBlacklistedNodesWithExpiry.retain { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this work?
allocationBlacklistedNodesWithExpiry.retain { case (_, expiry) =>
...
}
.doubleConf | ||
.checkValue(weight => weight >= 0 && weight <= 1, "The value of this ratio must be in [0, 1].") | ||
.createWithDefault(0.75) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too many blank lines.
test("expiring its own blacklisted nodes") { | ||
clock.setTime(0L) | ||
|
||
1 to MAX_FAILED_EXEC_PER_NODE_VALUE foreach { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(1 to blah).foreach { _ =>
...
}
} | ||
|
||
test("not handling the expiry of scheduler blacklisted nodes") { | ||
clock.setTime(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant?
Test build #89889 has finished for PR 21068 at commit
|
Test build #91307 has finished for PR 21068 at commit
|
hey sorry I have been meaning to respond to this but keep getting sidetracked. As Tom and I are going to meet in person next week anyway, I figure at this point it makes sense to just wait till we chat directly to make sure we're on the same page. It sounds like we're in agreement but at this point might as well wait a couple more days, as I haven't had a chance to do a final review anyway |
Tom and I had a chance to discuss this in person, and after some back and forth I think we decided that maybe its best to remove the limit but have the application fail if the entire cluster is blacklisted. @tgravescs does that sound correct? I mentioned this briefly to @attilapiros and he mentioned that might be hard, but instead you could stop allocation blacklisting which would result in the usual yarn app failure from too many executors. He's going to look at this a little more closely and report back here. I'd be OK with that -- the main goal is just make sure that an app doesn't hang if you've blacklisted the entire cluster. I'm pretty sure that's @tgravescs main concern as well. (If the only reasonable way to do that is with the existing limit, I'm fine w/ that too.) |
Test build #91764 has finished for PR 21068 at commit
|
Test build #91779 has finished for PR 21068 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a couple of minor things, but overall lgtm
@attilapiros can you please test this latest version on a cluster again?
@tgravescs this version will kill the app when the whole cluster is blacklisted, attila found out it was easy to do.
|
||
private val defaultTimeout = "1h" | ||
|
||
private val blacklistTimeoutMillis = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlacklistTracker.getBlacklistTimeout(conf)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok then I relax a bit on the visibility of BlacklistTracker by changing it from private[scheduler] to private[spark].
val endTime = clock.getTimeMillis() | ||
while (executorFailuresValidityInterval > 0 && | ||
failedExecutorsWithTimeStamps.nonEmpty && | ||
failedExecutorsWithTimeStamps.head < endTime - executorFailuresValidityInterval) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double indent the condition
failedExecutorsWithTimeStamps.dequeue() | ||
failedExecutorsWithTimeStamps.nonEmpty && | ||
failedExecutorsWithTimeStamps.head < endTime - executorFailuresValidityInterval) { | ||
failedExecutorsWithTimeStamps.dequeue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but only single indent the body of the while
Test build #91860 has finished for PR 21068 at commit
|
Retested manually on a cluster. The PR's description is updated with the result . |
Test build #91905 has finished for PR 21068 at commit
|
Test build #91907 has finished for PR 21068 at commit
|
Jenkins, retest this please |
lgtm will leave open for a couple of days to let @tgravescs take a look |
* | ||
* <ul> | ||
* <li> from the scheduler as task level blacklisted nodes | ||
* <li> from this class (tracked here) as YARN resource allocation problems |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This source never touches the scheduler's blacklist right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Just the other way around: the scheduler's blacklisted hosts will be sent here for forwarding them to YARN. This way at the resource allocation they will be taken into account.
Test build #91920 has finished for PR 21068 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 this is going to be very useful
private def updateAllocationBlacklistedNodes(hostname: String): Unit = { | ||
val failuresOnHost = failureTracker.numFailuresOnHost(hostname) | ||
if (failuresOnHost > maxFailuresPerHost) { | ||
logInfo(s"blacklisting $hostname as YARN allocation failed $failuresOnHost times") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe logWarn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great if there is a metric on failuresOnHost count...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I am happy you consider this change useful.
Regarding logInfo I have chosen that to be consistent with the logging of the existing BlacklistTracker where blacklisting itself is taken as a part of the normal behaviour and logInfo is used. But if you have a strong feeling about logWarn I can do the change.
For the metrics I've done some quick search in the yarn module and it seems to me currently no metrics are coming from there so the change probably is not just a few lines. What about me creating a new jira task for it? Is that fine for you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, exposing metrics is not a bad idea, but I'd like to leave it out of this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@felixcheung I have started to gain some experience about metrics (as I worked on SPARK-24594) and it seems to me the structure of the metrics (the metric names) should be known and registered before starting the metric systems. So I can add a new metric for ALL the failures, but not for each hosts separately, like with console sink:
-- Gauges ----------------------------------------------------------------------
yarn_cluster.executorFailures.ALL
value = 3
Aggregated values would be also possible. Any idea what would be the most valuable for Spark users besides this restriction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking a bit more about this problem and I have an idea: creating metrics for the number of hosts with a predetermined number of executor failures. Like yarn_cluster.numHostWithExecutorFailures.x where x is [1 , ... max (10, spark.blacklist.application.maxFailedExecutorsPerNode if backlisting enable, spark.yarn.max.executor.failures if set)]. What is your opinion?
@@ -328,4 +328,10 @@ package object config { | |||
CACHED_FILES_TYPES, | |||
CACHED_CONF_ARCHIVE) | |||
|
|||
/* YARN allocator-level blacklisting related config entries. */ | |||
private[spark] val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to document this in docs/running-on-yarn.md
Looks like it was modified to kill if all nodes blacklisted so I'm good with this approach. |
Test build #92031 has finished for PR 21068 at commit
|
retest this please |
Test build #92040 has finished for PR 21068 at commit
|
Here is the new task for the metrics: https://issues.apache.org/jira/browse/SPARK-24594. |
merged to master. Thanks @attilapiros ! |
What changes were proposed in this pull request?
This change extends YARN resource allocation handling with blacklisting functionality.
This handles cases when node is messed up or misconfigured such that a container won't launch on it. Before this change backlisting only focused on task execution but this change introduces YarnAllocatorBlacklistTracker which tracks allocation failures per host (when enabled via "spark.yarn.blacklist.executor.launch.blacklisting.enabled").
How was this patch tested?
With unit tests
Including a new suite: YarnAllocatorBlacklistTrackerSuite.
Manually
It was tested on a cluster by deleting the Spark jars on one of the node.
Behaviour before these changes
Starting Spark as:
Log is:
Behaviour after these changes
Starting Spark as:
And the log is:
Where the most important part is:
And execution was continued (no shutdown called).
Testing the backlisting of the whole cluster
Starting Spark with YARN blacklisting enabled then removing a the Spark core jar one by one from all the cluster nodes. Then executing a simple spark job which fails checking the yarn log the expected exit status is contained: