Skip to content

Commit

Permalink
[SPARK-13669][SPARK-20898][CORE] Improve the blacklist mechanism to h…
Browse files Browse the repository at this point in the history
…andle external shuffle service unavailable situation

## What changes were proposed in this pull request?

Currently we are running into an issue with Yarn work preserving enabled + external shuffle service.
In the work preserving enabled scenario, the failure of NM will not lead to the exit of executors, so executors can still accept and run the tasks. The problem here is when NM is failed, external shuffle service is actually inaccessible, so reduce tasks will always complain about the “Fetch failure”, and the failure of reduce stage will make the parent stage (map stage) rerun. The tricky thing here is Spark scheduler is not aware of the unavailability of external shuffle service, and will reschedule the map tasks on the executor where NM is failed, and again reduce stage will be failed with “Fetch failure”, and after 4 retries, the job is failed. This could also apply to other cluster manager with external shuffle service.

So here the main problem is that we should avoid assigning tasks to those bad executors (where shuffle service is unavailable). Current Spark's blacklist mechanism could blacklist executors/nodes by failure tasks, but it doesn't handle this specific fetch failure scenario. So here propose to improve the current application blacklist mechanism to handle fetch failure issue (especially with external shuffle service unavailable issue), to blacklist the executors/nodes where shuffle fetch is unavailable.

## How was this patch tested?

Unit test and small cluster verification.

Author: jerryshao <[email protected]>

Closes #17113 from jerryshao/SPARK-13669.
  • Loading branch information
jerryshao authored and Tom Graves committed Jun 26, 2017
1 parent 5282bae commit 9e50a1d
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ package object config {
.internal()
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val BLACKLIST_FETCH_FAILURE_ENABLED =
ConfigBuilder("spark.blacklist.application.fetchFailure.enabled")
.booleanConf
.createWithDefault(false)
// End blacklist confs

private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private[scheduler] class BlacklistTracker (
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
private val BLACKLIST_FETCH_FAILURE_ENABLED = conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED)

/**
* A map from executorId to information on task failures. Tracks the time of each task failure,
Expand Down Expand Up @@ -145,6 +146,74 @@ private[scheduler] class BlacklistTracker (
nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
}

private def killBlacklistedExecutor(exec: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(a) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
a.killExecutors(Seq(exec), true, true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
}
}

private def killExecutorsOnBlacklistedNode(node: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(a) =>
logInfo(s"Killing all executors on blacklisted host $node " +
s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
if (a.killExecutorsOnHost(node) == false) {
logError(s"Killing executors on node $node failed.")
}
case None =>
logWarning(s"Not attempting to kill executors on blacklisted host $node " +
s"since allocation client is not defined.")
}
}
}

def updateBlacklistForFetchFailure(host: String, exec: String): Unit = {
if (BLACKLIST_FETCH_FAILURE_ENABLED) {
// If we blacklist on fetch failures, we are implicitly saying that we believe the failure is
// non-transient, and can't be recovered from (even if this is the first fetch failure,
// stage is retried after just one failure, so we don't always get a chance to collect
// multiple fetch failures).
// If the external shuffle-service is on, then every other executor on this node would
// be suffering from the same issue, so we should blacklist (and potentially kill) all
// of them immediately.

val now = clock.getTimeMillis()
val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS

if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
if (!nodeIdToBlacklistExpiryTime.contains(host)) {
logInfo(s"blacklisting node $host due to fetch failure of external shuffle service")

nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
killExecutorsOnBlacklistedNode(host)
updateNextExpiryTime()
}
} else if (!executorIdToBlacklistStatus.contains(exec)) {
logInfo(s"Blacklisting executor $exec due to fetch failure")

executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, expiryTimeForNewBlacklists))
// We hardcoded number of failure tasks to 1 for fetch failure, because there's no
// reattempt for such failure.
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, 1))
updateNextExpiryTime()
killBlacklistedExecutor(exec)

val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]())
blacklistedExecsOnNode += exec
}
}
}

def updateBlacklistForSuccessfulTaskSet(
stageId: Int,
Expand Down Expand Up @@ -174,17 +243,7 @@ private[scheduler] class BlacklistTracker (
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(allocationClient) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
allocationClient.killExecutors(Seq(exec), true, true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
}
killBlacklistedExecutor(exec)

// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
Expand All @@ -199,19 +258,7 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(allocationClient) =>
logInfo(s"Killing all executors on blacklisted host $node " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
if (allocationClient.killExecutorsOnHost(node) == false) {
logError(s"Killing executors on node $node failed.")
}
case None =>
logWarning(s"Not attempting to kill executors on blacklisted host $node " +
s"since allocation client is not defined.")
}
}
killExecutorsOnBlacklistedNode(node)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,21 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
* acquire a lock on us, so we need to make sure that we don't try to lock the backend while
* we are holding a lock on ourselves.
*/
private[spark] class TaskSchedulerImpl private[scheduler](
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
isLocal: Boolean = false)
extends TaskScheduler with Logging {

import TaskSchedulerImpl._

def this(sc: SparkContext) = {
this(
sc,
sc.conf.get(config.MAX_TASK_FAILURES),
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
}

def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
this(
sc,
maxTaskFailures,
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
isLocal = isLocal)
}
// Lazily initializing blackListTrackOpt to avoid getting empty ExecutorAllocationClient,
// because ExecutorAllocationClient is created after this TaskSchedulerImpl.
private[scheduler] lazy val blacklistTrackerOpt = maybeCreateBlacklistTracker(sc)

val conf = sc.conf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,12 @@ private[spark] class TaskSetManager(
tasksSuccessful += 1
}
isZombie = true

if (fetchFailed.bmAddress != null) {
blacklistTracker.foreach(_.updateBlacklistForFetchFailure(
fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
}

None

case ef: ExceptionFailure =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,4 +530,59 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
verify(allocationClientMock).killExecutors(Seq("2"), true, true)
verify(allocationClientMock).killExecutorsOnHost("hostA")
}

test("fetch failure blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
val allocationClientMock = mock[ExecutorAllocationClient]
when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called"))
when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
// To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
// is updated before we ask the executor allocation client to kill all the executors
// on a particular host.
override def answer(invocation: InvocationOnMock): Boolean = {
if (blacklist.nodeBlacklist.contains("hostA") == false) {
throw new IllegalStateException("hostA should be on the blacklist")
}
true
}
})

conf.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)

// Disable auto-kill. Blacklist an executor and make sure killExecutors is not called.
conf.set(config.BLACKLIST_KILL_ENABLED, false)
blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")

verify(allocationClientMock, never).killExecutors(any(), any(), any())
verify(allocationClientMock, never).killExecutorsOnHost(any())

// Enable auto-kill. Blacklist an executor and make sure killExecutors is called.
conf.set(config.BLACKLIST_KILL_ENABLED, true)
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
clock.advance(1000)
blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")

verify(allocationClientMock).killExecutors(Seq("1"), true, true)
verify(allocationClientMock, never).killExecutorsOnHost(any())

assert(blacklist.executorIdToBlacklistStatus.contains("1"))
assert(blacklist.executorIdToBlacklistStatus("1").node === "hostA")
assert(blacklist.executorIdToBlacklistStatus("1").expiryTime ===
1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
assert(blacklist.nodeIdToBlacklistExpiryTime.isEmpty)

// Enable external shuffle service to see if all the executors on this node will be killed.
conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
clock.advance(1000)
blacklist.updateBlacklistForFetchFailure("hostA", exec = "2")

verify(allocationClientMock, never).killExecutors(Seq("2"), true, true)
verify(allocationClientMock).killExecutorsOnHost("hostA")

assert(blacklist.nodeIdToBlacklistExpiryTime.contains("hostA"))
assert(blacklist.nodeIdToBlacklistExpiryTime("hostA") ===
2000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
conf.set(config.BLACKLIST_ENABLED, true)
sc = new SparkContext(conf)
taskScheduler =
new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4), Some(blacklist)) {
new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
val tsm = super.createTaskSetManager(taskSet, maxFailures)
// we need to create a spied tsm just so we can set the TaskSetBlacklist
Expand All @@ -98,6 +98,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
stageToMockTaskSetBlacklist(taskSet.stageId) = taskSetBlacklist
tsmSpy
}

override private[scheduler] lazy val blacklistTrackerOpt = Some(blacklist)
}
setupHelper()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,38 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
.updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
}

test("update application blacklist for shuffle-fetch") {
// Setup a taskset, and fail some one task for fetch failure.
val conf = new SparkConf()
.set(config.BLACKLIST_ENABLED, true)
.set(config.SHUFFLE_SERVICE_ENABLED, true)
.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
sc = new SparkContext("local", "test", conf)
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
val blacklistTracker = new BlacklistTracker(sc, None)
val tsm = new TaskSetManager(sched, taskSet, 4, Some(blacklistTracker))

// make some offers to our taskset, to get tasks we will fail
val taskDescs = Seq(
"exec1" -> "host1",
"exec2" -> "host2"
).flatMap { case (exec, host) =>
// offer each executor twice (simulating 2 cores per executor)
(0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)}
}
assert(taskDescs.size === 4)

assert(!blacklistTracker.isExecutorBlacklisted(taskDescs(0).executorId))
assert(!blacklistTracker.isNodeBlacklisted("host1"))

// Fail the task with fetch failure
tsm.handleFailedTask(taskDescs(0).taskId, TaskState.FAILED,
FetchFailed(BlockManagerId(taskDescs(0).executorId, "host1", 12345), 0, 0, 0, "ignored"))

assert(blacklistTracker.isNodeBlacklisted("host1"))
}

private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,15 @@ Apart from these, the following properties are also available, and may be useful
all of the executors on that node will be killed.
</td>
</tr>
<tr>
<td><code>spark.blacklist.application.fetchFailure.enabled</code></td>
<td>false</td>
<td>
(Experimental) If set to "true", Spark will blacklist the executor immediately when a fetch
failure happenes. If external shuffle service is enabled, then the whole node will be
blacklisted.
</td>
</tr>
<tr>
<td><code>spark.speculation</code></td>
<td>false</td>
Expand Down

0 comments on commit 9e50a1d

Please sign in to comment.