Skip to content

Commit

Permalink
[YSPARK-586] handle fetch failures due to nodes down better
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Graves committed May 26, 2017
1 parent 8a270a9 commit 6baaf96
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 57 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,

// HashMaps for storing mapStatuses and cached serialized statuses in the driver.
// Statuses are dropped only by explicit de-registering.
protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
// Exposed for testing
val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala

private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ package object config {
.internal()
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val BLACKLIST_FETCH_FAILURE_ENABLED =
ConfigBuilder("spark.blacklist.application.fetchFailure.enabled")
.booleanConf
.createWithDefault(true)

private[spark] val BLACKLIST_FETCH_FAILURE_MAXFAILURES =
ConfigBuilder("spark.blacklist.application.fetchFailure.maxFailures")
.intConf
.createWithDefault(1)
// End blacklist confs

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ private[scheduler] class BlacklistTracker (
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
private val BLACKLIST_FETCH_FAILURE_ENABLED = conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED)
private val BLACKLIST_FETCH_FAILURE_MAXFAILURES =
conf.get(config.BLACKLIST_FETCH_FAILURE_MAXFAILURES)


/**
* A map from executorId to information on task failures. Tracks the time of each task failure,
Expand All @@ -72,7 +76,10 @@ private[scheduler] class BlacklistTracker (
private val executorIdToFailureList = new HashMap[String, ExecutorFailureList]()
val executorIdToBlacklistStatus = new HashMap[String, BlacklistedExecutor]()
val nodeIdToBlacklistExpiryTime = new HashMap[String, Long]()
/**

private val hostToFetchFailed = new HashMap[String, Long]()

/**
* An immutable copy of the set of nodes that are currently blacklisted. Kept in an
* AtomicReference to make [[nodeBlacklist()]] thread-safe.
*/
Expand Down Expand Up @@ -145,6 +152,65 @@ private[scheduler] class BlacklistTracker (
nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
}

private def killBlacklistedExecutor(exec: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(a) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
a.killExecutors(Seq(exec), true, true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
}
}

private def killExecutorsOnBlacklistedNode(node: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(a) =>
logInfo(s"Killing all executors on blacklisted host $node " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
if (a.killExecutorsOnHost(node) == false) {
logError(s"Killing executors on node $node failed.")
}
case None =>
logWarning(s"Not attempting to kill executors on blacklisted host $node " +
s"since allocation client is not defined.")
}
}
}

def updateBlacklistForFetchFailure(host: String): Unit = {
if (BLACKLIST_FETCH_FAILURE_ENABLED) {

// only handle blacklist of fetch failures with external shuffle service
if (conf.getBoolean("spark.shuffle.service.enabled", false) &&
!nodeIdToBlacklistExpiryTime.contains(host)) {

if (hostToFetchFailed.contains(host)) {
hostToFetchFailed(host) += 1
} else {
hostToFetchFailed(host) = 1
}
logDebug("blacklist add fetch failure to host: " + host + " total failures: " +
hostToFetchFailed(host))

if (hostToFetchFailed(host) >= BLACKLIST_FETCH_FAILURE_MAXFAILURES) {
logInfo(s"blacklisting node $host due to to many fetch failures of external shuffle")
val now = clock.getTimeMillis()
val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
updateNextExpiryTime()
killExecutorsOnBlacklistedNode(host)
hostToFetchFailed.remove(host)
}
}
}
}

def updateBlacklistForSuccessfulTaskSet(
stageId: Int,
Expand Down Expand Up @@ -174,17 +240,7 @@ private[scheduler] class BlacklistTracker (
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(allocationClient) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
allocationClient.killExecutors(Seq(exec), true, true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
}
killBlacklistedExecutor(exec)

// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
Expand All @@ -199,19 +255,7 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(allocationClient) =>
logInfo(s"Killing all executors on blacklisted host $node " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
if (allocationClient.killExecutorsOnHost(node) == false) {
logError(s"Killing executors on node $node failed.")
}
case None =>
logWarning(s"Not attempting to kill executors on blacklisted host $node " +
s"since allocation client is not defined.")
}
}
killExecutorsOnBlacklistedNode(node)
}
}
}
Expand Down
38 changes: 32 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ class DAGScheduler(
failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
val shouldAbortStage =
failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts ||
disallowStageRetryForTest
disallowStageRetryForTest

if (shouldAbortStage) {
val abortMessage = if (disallowStageRetryForTest) {
Expand Down Expand Up @@ -1297,9 +1297,35 @@ class DAGScheduler(
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
if (env.blockManager.externalShuffleServiceEnabled) {

if ((!failedEpoch.contains(bmAddress.executorId) ||
failedEpoch(bmAddress.executorId) < task.epoch)) {
// mark all the executors on that host as failed so we don't do this again
// if another task failed for same executor
val execsOnHost = mapStage.getExecutorsWithOutputsOnHost(bmAddress.host)
logDebug("executors on host is: " + execsOnHost + " epoch: " + task.epoch)
for (exec <- execsOnHost) {
failedEpoch(exec) = task.epoch
logInfo("Removing outputs for executor: %s (epoch %d)".format(exec, task.epoch))
blockManagerMaster.removeExecutor(exec)
// only mark this mapStage output on the executor as bad.. could do all stages
// but would cost more
mapStage.removeOutputsOnExecutor(exec)
}
mapOutputTracker.registerMapOutputs(
shuffleId,
mapStage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
if (shuffleIdToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
}
clearCacheLocs()
}
} else {
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
}
}
}

Expand Down Expand Up @@ -1358,7 +1384,7 @@ class DAGScheduler(
}
} else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
"(epoch " + currentEpoch + ")")
}
}

Expand Down Expand Up @@ -1645,11 +1671,11 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>
val filesLost = reason match {
val workerLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, filesLost)
dagScheduler.handleExecutorLost(execId, workerLost)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.scheduler

import org.apache.spark.ShuffleDependency
import scala.collection.mutable.HashSet

import org.apache.spark.rdd.RDD
import org.apache.spark.ShuffleDependency
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.CallSite

Expand Down Expand Up @@ -119,6 +121,18 @@ private[spark] class ShuffleMapStage(
outputLocs.map(_.headOption.orNull)
}

/**
* Returns a list of executorids that have outputs on the host passed in.
*/
def getExecutorsWithOutputsOnHost(host: String): List[String] = {
val executors = new HashSet[String]
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
executors ++= prevList.filter(_.location.host == host).map((s) => s.location.executorId)
}
executors.toList
}

/**
* Removes all shuffle outputs associated with this executor. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,10 @@ private[spark] class TaskSetManager(
tasksSuccessful += 1
}
isZombie = true

if (fetchFailed.bmAddress != null) {
blacklistTracker.foreach(_.updateBlacklistForFetchFailure(fetchFailed.bmAddress.host))
}
None

case ef: ExceptionFailure =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
override def beforeEach(): Unit = {
conf = new SparkConf().setAppName("test").setMaster("local")
.set(config.BLACKLIST_ENABLED.key, "true")
conf.set("spark.authenticate", "false")
scheduler = mockTaskSchedWithConf(conf)

clock.setTime(0)
Expand Down Expand Up @@ -529,4 +530,37 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
verify(allocationClientMock).killExecutors(Seq("2"), true, true)
verify(allocationClientMock).killExecutorsOnHost("hostA")
}

test("fetch failure blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
val allocationClientMock = mock[ExecutorAllocationClient]
when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
// To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
// is updated before we ask the executor allocation client to kill all the executors
// on a particular host.
override def answer(invocation: InvocationOnMock): Boolean = {
if (blacklist.nodeBlacklist.contains("hostA") == false) {
throw new IllegalStateException("hostA should be on the blacklist")
}
true
}
})

conf.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)

// Disable auto-kill. Blacklist an executor and make sure killExecutors is not called.
conf.set(config.BLACKLIST_KILL_ENABLED, false)
blacklist.updateBlacklistForFetchFailure("hostA")

verify(allocationClientMock, never).killExecutorsOnHost(any())

// Enable auto-kill.
conf.set(config.BLACKLIST_KILL_ENABLED, true)
// Enable external shuffle service to see if all the executors on this node will be killed.
conf.set("spark.shuffle.service.enabled", "true")
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
blacklist.updateBlacklistForFetchFailure("hostA")

verify(allocationClientMock).killExecutorsOnHost("hostA")
}
}
Loading

0 comments on commit 6baaf96

Please sign in to comment.