From 6baaf968c1aa28ce105918bbe1d094ca10112fe3 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 26 May 2017 17:31:36 +0000 Subject: [PATCH] [YSPARK-586] handle fetch failures due to nodes down better --- .../org/apache/spark/MapOutputTracker.scala | 3 +- .../spark/internal/config/package.scala | 10 ++ .../spark/scheduler/BlacklistTracker.scala | 94 ++++++++++++++----- .../apache/spark/scheduler/DAGScheduler.scala | 38 ++++++-- .../spark/scheduler/ShuffleMapStage.scala | 16 +++- .../spark/scheduler/TaskSetManager.scala | 4 + .../scheduler/BlacklistTrackerSuite.scala | 34 +++++++ .../spark/scheduler/DAGSchedulerSuite.scala | 67 +++++++++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 85 ++++++++++++----- 9 files changed, 294 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 6f5c31d7ab71c..3a6f60a585708 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -289,7 +289,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, // HashMaps for storing mapStatuses and cached serialized statuses in the driver. // Statuses are dropped only by explicit de-registering. - protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala + // Exposed for testing + val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5ce65200660e1..54647277b537b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -149,6 +149,16 @@ package object config { .internal() .timeConf(TimeUnit.MILLISECONDS) .createOptional + + private[spark] val BLACKLIST_FETCH_FAILURE_ENABLED = + ConfigBuilder("spark.blacklist.application.fetchFailure.enabled") + .booleanConf + .createWithDefault(true) + + private[spark] val BLACKLIST_FETCH_FAILURE_MAXFAILURES = + ConfigBuilder("spark.blacklist.application.fetchFailure.maxFailures") + .intConf + .createWithDefault(1) // End blacklist confs private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index e130e609e4f63..3cc919d6cd61b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -61,6 +61,10 @@ private[scheduler] class BlacklistTracker ( private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + private val BLACKLIST_FETCH_FAILURE_ENABLED = conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED) + private val BLACKLIST_FETCH_FAILURE_MAXFAILURES = + conf.get(config.BLACKLIST_FETCH_FAILURE_MAXFAILURES) + /** * A map from executorId to information on task failures. Tracks the time of each task failure, @@ -72,7 +76,10 @@ private[scheduler] class BlacklistTracker ( private val executorIdToFailureList = new HashMap[String, ExecutorFailureList]() val executorIdToBlacklistStatus = new HashMap[String, BlacklistedExecutor]() val nodeIdToBlacklistExpiryTime = new HashMap[String, Long]() - /** + + private val hostToFetchFailed = new HashMap[String, Long]() + + /** * An immutable copy of the set of nodes that are currently blacklisted. Kept in an * AtomicReference to make [[nodeBlacklist()]] thread-safe. */ @@ -145,6 +152,65 @@ private[scheduler] class BlacklistTracker ( nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry) } + private def killBlacklistedExecutor(exec: String): Unit = { + if (conf.get(config.BLACKLIST_KILL_ENABLED)) { + allocationClient match { + case Some(a) => + logInfo(s"Killing blacklisted executor id $exec " + + s"since spark.blacklist.killBlacklistedExecutors is set.") + a.killExecutors(Seq(exec), true, true) + case None => + logWarning(s"Not attempting to kill blacklisted executor id $exec " + + s"since allocation client is not defined.") + } + } + } + + private def killExecutorsOnBlacklistedNode(node: String): Unit = { + if (conf.get(config.BLACKLIST_KILL_ENABLED)) { + allocationClient match { + case Some(a) => + logInfo(s"Killing all executors on blacklisted host $node " + + s"since spark.blacklist.killBlacklistedExecutors is set.") + if (a.killExecutorsOnHost(node) == false) { + logError(s"Killing executors on node $node failed.") + } + case None => + logWarning(s"Not attempting to kill executors on blacklisted host $node " + + s"since allocation client is not defined.") + } + } + } + + def updateBlacklistForFetchFailure(host: String): Unit = { + if (BLACKLIST_FETCH_FAILURE_ENABLED) { + + // only handle blacklist of fetch failures with external shuffle service + if (conf.getBoolean("spark.shuffle.service.enabled", false) && + !nodeIdToBlacklistExpiryTime.contains(host)) { + + if (hostToFetchFailed.contains(host)) { + hostToFetchFailed(host) += 1 + } else { + hostToFetchFailed(host) = 1 + } + logDebug("blacklist add fetch failure to host: " + host + " total failures: " + + hostToFetchFailed(host)) + + if (hostToFetchFailed(host) >= BLACKLIST_FETCH_FAILURE_MAXFAILURES) { + logInfo(s"blacklisting node $host due to to many fetch failures of external shuffle") + val now = clock.getTimeMillis() + val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS + nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists) + listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1)) + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + updateNextExpiryTime() + killExecutorsOnBlacklistedNode(host) + hostToFetchFailed.remove(host) + } + } + } + } def updateBlacklistForSuccessfulTaskSet( stageId: Int, @@ -174,17 +240,7 @@ private[scheduler] class BlacklistTracker ( listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal)) executorIdToFailureList.remove(exec) updateNextExpiryTime() - if (conf.get(config.BLACKLIST_KILL_ENABLED)) { - allocationClient match { - case Some(allocationClient) => - logInfo(s"Killing blacklisted executor id $exec " + - s"since spark.blacklist.killBlacklistedExecutors is set.") - allocationClient.killExecutors(Seq(exec), true, true) - case None => - logWarning(s"Not attempting to kill blacklisted executor id $exec " + - s"since allocation client is not defined.") - } - } + killBlacklistedExecutor(exec) // In addition to blacklisting the executor, we also update the data for failures on the // node, and potentially put the entire node into a blacklist as well. @@ -199,19 +255,7 @@ private[scheduler] class BlacklistTracker ( nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) - if (conf.get(config.BLACKLIST_KILL_ENABLED)) { - allocationClient match { - case Some(allocationClient) => - logInfo(s"Killing all executors on blacklisted host $node " + - s"since spark.blacklist.killBlacklistedExecutors is set.") - if (allocationClient.killExecutorsOnHost(node) == false) { - logError(s"Killing executors on node $node failed.") - } - case None => - logWarning(s"Not attempting to kill executors on blacklisted host $node " + - s"since allocation client is not defined.") - } - } + killExecutorsOnBlacklistedNode(node) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 758e4a11bdc8d..4fa8f8bd18184 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1265,7 +1265,7 @@ class DAGScheduler( failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) val shouldAbortStage = failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest + disallowStageRetryForTest if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { @@ -1297,9 +1297,35 @@ class DAGScheduler( mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) } - // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) + if (env.blockManager.externalShuffleServiceEnabled) { + + if ((!failedEpoch.contains(bmAddress.executorId) || + failedEpoch(bmAddress.executorId) < task.epoch)) { + // mark all the executors on that host as failed so we don't do this again + // if another task failed for same executor + val execsOnHost = mapStage.getExecutorsWithOutputsOnHost(bmAddress.host) + logDebug("executors on host is: " + execsOnHost + " epoch: " + task.epoch) + for (exec <- execsOnHost) { + failedEpoch(exec) = task.epoch + logInfo("Removing outputs for executor: %s (epoch %d)".format(exec, task.epoch)) + blockManagerMaster.removeExecutor(exec) + // only mark this mapStage output on the executor as bad.. could do all stages + // but would cost more + mapStage.removeOutputsOnExecutor(exec) + } + mapOutputTracker.registerMapOutputs( + shuffleId, + mapStage.outputLocInMapOutputTrackerFormat(), + changeEpoch = true) + if (shuffleIdToMapStage.isEmpty) { + mapOutputTracker.incrementEpoch() + } + clearCacheLocs() + } + } else { + handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) + } } } @@ -1358,7 +1384,7 @@ class DAGScheduler( } } else { logDebug("Additional executor lost message for " + execId + - "(epoch " + currentEpoch + ")") + "(epoch " + currentEpoch + ")") } } @@ -1645,11 +1671,11 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => - val filesLost = reason match { + val workerLost = reason match { case SlaveLost(_, true) => true case _ => false } - dagScheduler.handleExecutorLost(execId, filesLost) + dagScheduler.handleExecutorLost(execId, workerLost) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index 51416e5ce97fc..ca8c812e8c557 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -17,8 +17,10 @@ package org.apache.spark.scheduler -import org.apache.spark.ShuffleDependency +import scala.collection.mutable.HashSet + import org.apache.spark.rdd.RDD +import org.apache.spark.ShuffleDependency import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.CallSite @@ -119,6 +121,18 @@ private[spark] class ShuffleMapStage( outputLocs.map(_.headOption.orNull) } + /** + * Returns a list of executorids that have outputs on the host passed in. + */ + def getExecutorsWithOutputsOnHost(host: String): List[String] = { + val executors = new HashSet[String] + for (partition <- 0 until numPartitions) { + val prevList = outputLocs(partition) + executors ++= prevList.filter(_.location.host == host).map((s) => s.location.executorId) + } + executors.toList + } + /** * Removes all shuffle outputs associated with this executor. Note that this will also remove * outputs which are served by an external shuffle server (if one exists), as they are still diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3756c216f5ecb..88dd0c884d555 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -737,6 +737,10 @@ private[spark] class TaskSetManager( tasksSuccessful += 1 } isZombie = true + + if (fetchFailed.bmAddress != null) { + blacklistTracker.foreach(_.updateBlacklistForFetchFailure(fetchFailed.bmAddress.host)) + } None case ef: ExceptionFailure => diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 2b18ebee79a2b..69a296e145dea 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -41,6 +41,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M override def beforeEach(): Unit = { conf = new SparkConf().setAppName("test").setMaster("local") .set(config.BLACKLIST_ENABLED.key, "true") + conf.set("spark.authenticate", "false") scheduler = mockTaskSchedWithConf(conf) clock.setTime(0) @@ -529,4 +530,37 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M verify(allocationClientMock).killExecutors(Seq("2"), true, true) verify(allocationClientMock).killExecutorsOnHost("hostA") } + + test("fetch failure blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") { + val allocationClientMock = mock[ExecutorAllocationClient] + when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] { + // To avoid a race between blacklisting and killing, it is important that the nodeBlacklist + // is updated before we ask the executor allocation client to kill all the executors + // on a particular host. + override def answer(invocation: InvocationOnMock): Boolean = { + if (blacklist.nodeBlacklist.contains("hostA") == false) { + throw new IllegalStateException("hostA should be on the blacklist") + } + true + } + }) + + conf.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true) + blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + + // Disable auto-kill. Blacklist an executor and make sure killExecutors is not called. + conf.set(config.BLACKLIST_KILL_ENABLED, false) + blacklist.updateBlacklistForFetchFailure("hostA") + + verify(allocationClientMock, never).killExecutorsOnHost(any()) + + // Enable auto-kill. + conf.set(config.BLACKLIST_KILL_ENABLED, true) + // Enable external shuffle service to see if all the executors on this node will be killed. + conf.set("spark.shuffle.service.enabled", "true") + blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + blacklist.updateBlacklistForFetchFailure("hostA") + + verify(allocationClientMock).killExecutorsOnHost("hostA") + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 79ffc69c37e03..feec6ec86e221 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -104,6 +104,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou import DAGSchedulerSuite._ val conf = new SparkConf + conf.set("spark.authenticate", "false") /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() @@ -208,6 +209,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } private def init(testConf: SparkConf): Unit = { + testConf.set("spark.authenticate", "false") sc = new SparkContext("local", "DAGSchedulerSuite", testConf) sparkListener.submittedStageInfos.clear() sparkListener.successfulStages.clear() @@ -395,6 +397,70 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } + test("shuffle files on the host should be removed when fetch fails") { + // reset the test context with the right shuffle service config + afterEach() + val conf = new SparkConf() + conf.set("spark.shuffle.service.enabled", "true") + conf.set("spark.authenticate", "false") + init(conf) + runEvent(ExecutorAdded("exec-hostA1", "hostA")) + runEvent(ExecutorAdded("exec-hostA2", "hostA")) + runEvent(ExecutorAdded("exec-hostB", "hostB")) + val firstRDD = new MyRDD(sc, 3, Nil) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3)) + val firstShuffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + submit(reduceRdd, Array(0)) + // map stage1 completes successfully, with one task on each executor + complete(taskSets(0), Seq( + (Success, + MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))), + (Success, + MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))), + (Success, makeMapStatus("hostB", 1)) + )) + // map stage2 completes successfully, with one task on each executor + complete(taskSets(1), Seq( + (Success, + MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))), + (Success, + MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))), + (Success, makeMapStatus("hostB", 1)) + )) + // make sure our test setup is correct + val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get + assert(initialMapStatus1.count(_ != null) === 3) + assert(initialMapStatus1.map{_.location.executorId}.toSet === + Set("exec-hostA1", "exec-hostA2", "exec-hostB")) + + val initialMapStatus2 = mapOutputTracker.mapStatuses.get(1).get + assert(initialMapStatus2.count(_ != null) === 3) + assert(initialMapStatus2.map{_.location.executorId}.toSet === + Set("exec-hostA1", "exec-hostA2", "exec-hostB")) + + // reduce stage fails with a fetch failure from one host + complete(taskSets(2), Seq( + (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), firstShuffleId, 0, 0, "ignored"), + null) + )) + + // Here is the main assertion -- make sure that we de-register + // the map outputs for both map stage from both executors on hostA + val mapStatus1 = mapOutputTracker.mapStatuses.get(0).get + assert(mapStatus1.count(_ != null) === 1) + assert(mapStatus1(2).location.executorId === "exec-hostB") + assert(mapStatus1(2).location.host === "hostB") + + // we are only invdaliating the immediate map output, other parents aren't + val mapStatus2 = mapOutputTracker.mapStatuses.get(1).get + assert(mapStatus2.count(_ != null) === 3) + assert(mapStatus2(2).location.executorId === "exec-hostB") + assert(mapStatus2(2).location.host === "hostB") + } + test("zero split job") { var numResults = 0 var failureReason: Option[Exception] = None @@ -646,6 +712,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou afterEach() val conf = new SparkConf() conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString) + conf.set("spark.authenticate", "false") init(conf) assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d03a0c990a02b..f58f4438446b6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -156,6 +156,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL} private val conf = new SparkConf + conf.set("spark.authenticate", "false") val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s") val MAX_TASK_FAILURES = 4 @@ -179,7 +180,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("TaskSet with no preferences") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock @@ -198,7 +199,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("multiple offers with no preferences") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(3) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) @@ -232,7 +233,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("skip unsatisfiable locality levels") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2")) val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB"))) val clock = new ManualClock @@ -248,7 +249,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("basic delay scheduling") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4, Seq(TaskLocation("host1", "exec1")), @@ -278,7 +279,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("we do not need to delay scheduling when we only have noPref tasks in the queue") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2")) val taskSet = FakeTask.createTaskSet(3, Seq(TaskLocation("host1", "exec1")), @@ -295,7 +296,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("delay scheduling with fallback") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) val taskSet = FakeTask.createTaskSet(5, @@ -335,7 +336,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("delay scheduling with failed hosts") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) val taskSet = FakeTask.createTaskSet(3, @@ -372,7 +373,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("task result lost") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock @@ -389,7 +390,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("repeated failures lead to task set abortion") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock @@ -418,6 +419,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg set(config.BLACKLIST_TIMEOUT_CONF, rescheduleDelay). // don't wait to jump locality levels in this test set("spark.locality.wait", "0") + conf.set("spark.authenticate", "false") sc = new SparkContext("local", "test", conf) // two executors on same host, one on different. @@ -511,7 +513,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("new executors get added and lost") { // Assign host2 to rack2 FakeRackUtil.assignHostToRack("host2", "rack2") - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc) val taskSet = FakeTask.createTaskSet(4, Seq(TaskLocation("host1", "execA")), @@ -543,7 +545,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("Executors exit for reason unrelated to currently running tasks") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc) val taskSet = FakeTask.createTaskSet(4, Seq(TaskLocation("host1", "execA")), @@ -576,7 +578,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg FakeRackUtil.assignHostToRack("host2", "rack1") // Assign host3 to rack2 FakeRackUtil.assignHostToRack("host3", "rack2") - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) val taskSet = FakeTask.createTaskSet(2, @@ -599,7 +601,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("do not emit warning when serialized task is small") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) @@ -612,7 +614,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("emit warning when serialized task is large") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null) @@ -626,7 +628,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("Not serializable exception thrown if the task cannot be serialized") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = new TaskSet( @@ -641,6 +643,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("abort the job if total size of results is too large") { val conf = new SparkConf().set("spark.driver.maxResultSize", "2m") + conf.set("spark.authenticate", "false") sc = new SparkContext("local", "test", conf) def genBytes(size: Int): (Int) => Array[Byte] = { (x: Int) => @@ -665,7 +668,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("speculative and noPref task should be scheduled after node-local") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler( sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) val taskSet = FakeTask.createTaskSet(4, @@ -693,7 +696,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("node-local tasks should be scheduled right away " + "when there are only node-local and no-preference tasks") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler( sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) val taskSet = FakeTask.createTaskSet(4, @@ -716,7 +719,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) val taskSet = FakeTask.createTaskSet(4, Seq(TaskLocation("host1")), @@ -737,7 +740,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) val taskSet = FakeTask.createTaskSet(3, Seq(), @@ -758,7 +761,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Ensure TaskSetManager is usable after addition of levels") { // Regression test for SPARK-2931 - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc) val taskSet = FakeTask.createTaskSet(2, Seq(TaskLocation("host1", "execA")), @@ -790,7 +793,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Test that locations with HDFSCacheTaskLocation are treated as PROCESS_LOCAL.") { // Regression test for SPARK-2931 - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) val taskSet = FakeTask.createTaskSet(3, @@ -820,7 +823,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("Kill other task attempts when one attempt belonging to the same task succeeds") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately @@ -873,7 +876,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("Killing speculative tasks does not count towards aborting the taskset") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(5) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately @@ -981,7 +984,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock) @@ -1004,6 +1007,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // and killed task. val conf = new SparkConf(). set(config.BLACKLIST_ENABLED, true) + conf.set("spark.authenticate", "false") sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4) @@ -1039,6 +1043,39 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg .updateBlacklistForFailedTask(anyString(), anyString(), anyInt()) } + test("update application blacklist for shuffle-fetch") { + // Setup a taskset, and fail some one task for fetch failure. + val conf = new SparkConf() + .set(config.BLACKLIST_ENABLED, true) + .set(config.SHUFFLE_SERVICE_ENABLED, true) + .set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true) + conf.set("spark.authenticate", "false") + sc = new SparkContext("local", "test", conf) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val taskSet = FakeTask.createTaskSet(4) + val blacklistTracker = new BlacklistTracker(sc, None) + val tsm = new TaskSetManager(sched, taskSet, 4, Some(blacklistTracker)) + + // make some offers to our taskset, to get tasks we will fail + val taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" + ).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 4) + + assert(!blacklistTracker.isNodeBlacklisted("host1")) + + // Fail the task with fetch failure + tsm.handleFailedTask(taskDescs(0).taskId, TaskState.FAILED, + FetchFailed(BlockManagerId(taskDescs(0).executorId, "host1", 12345), 0, 0, 0, "ignored")) + + assert(blacklistTracker.isNodeBlacklisted("host1")) + } + + private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {