From 1dd3eb5b849bb250f4251730c1afa722757b706b Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 25 Aug 2014 22:38:42 -0700 Subject: [PATCH 1/5] [SPARK-3224] FetchFailed reduce stages should only show up once in the failed stages UI. --- .../apache/spark/scheduler/DAGScheduler.scala | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 34131984570e4..3551e7c18229e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1045,31 +1045,37 @@ class DAGScheduler( stage.pendingTasks += task case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => - // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) - markStageAsFinished(failedStage, Some("Fetch failure")) - runningStages -= failedStage - // TODO: Cancel running tasks in the stage - logInfo("Marking " + failedStage + " (" + failedStage.name + - ") for resubmision due to a fetch failure") - // Mark the map whose fetch failed as broken in the map stage - val mapStage = shuffleToMapStage(shuffleId) - if (mapId != -1) { - mapStage.removeOutputLoc(mapId, bmAddress) - mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } - logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + - "); marking it for resubmission") - if (failedStages.isEmpty && eventProcessActor != null) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. eventProcessActor may be - // null during unit tests. - import env.actorSystem.dispatcher - env.actorSystem.scheduler.scheduleOnce( - RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) + // It is likely that we receive multiple FetchFailed for a single stage (because we have + // multiple tasks running concurrently on different executors). In that case, it is possible + // the fetch failure has already been handled by the executor. + if (runningStages.contains(failedStage)) { + markStageAsFinished(failedStage, Some("Fetch failure")) + runningStages -= failedStage + // TODO: Cancel running tasks in the stage + logInfo("Marking " + failedStage + " (" + failedStage.name + + ") for resubmision due to a fetch failure") + + // Mark the map whose fetch failed as broken in the map stage + val mapStage = shuffleToMapStage(shuffleId) + if (mapId != -1) { + mapStage.removeOutputLoc(mapId, bmAddress) + mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + + "); marking it for resubmission") + if (failedStages.isEmpty && eventProcessActor != null) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. eventProcessActor may be + // null during unit tests. + import env.actorSystem.dispatcher + env.actorSystem.scheduler.scheduleOnce( + RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) + } + failedStages += failedStage + failedStages += mapStage } - failedStages += failedStage - failedStages += mapStage // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.epoch)) From 3d3d3565f82f3551c2bd7c6eb6013b551179729c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 26 Aug 2014 15:21:02 -0700 Subject: [PATCH 2/5] Remove map output loc even for repeated FetchFaileds. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3551e7c18229e..44744a6d4e596 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1046,6 +1046,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => val failedStage = stageIdToStage(task.stageId) + val mapStage = shuffleToMapStage(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is possible // the fetch failure has already been handled by the executor. @@ -1056,13 +1057,6 @@ class DAGScheduler( logInfo("Marking " + failedStage + " (" + failedStage.name + ") for resubmision due to a fetch failure") - // Mark the map whose fetch failed as broken in the map stage - val mapStage = shuffleToMapStage(shuffleId) - if (mapId != -1) { - mapStage.removeOutputLoc(mapId, bmAddress) - mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } - logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + "); marking it for resubmission") if (failedStages.isEmpty && eventProcessActor != null) { @@ -1076,6 +1070,13 @@ class DAGScheduler( failedStages += failedStage failedStages += mapStage } + + // Mark the map whose fetch failed as broken in the map stage + if (mapId != -1) { + mapStage.removeOutputLoc(mapId, bmAddress) + mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.epoch)) From 796d282a292f3c3bae36ffc2d247585b15695e24 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 26 Aug 2014 17:12:27 -0700 Subject: [PATCH 3/5] Added unit test for SPARK-3224 --- .../spark/scheduler/DAGSchedulerSuite.scala | 41 ++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index bd829752eb401..f5fed988ade24 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import scala.collection.mutable.{HashSet, HashMap, Map} +import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map} import scala.language.reflectiveCalls import akka.actor._ @@ -98,7 +98,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F val WAIT_TIMEOUT_MILLIS = 10000 val sparkListener = new SparkListener() { val successfulStages = new HashSet[Int]() - val failedStages = new HashSet[Int]() + val failedStages = new ArrayBuffer[Int]() override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { val stageInfo = stageCompleted.stageInfo if (stageInfo.failureReason.isEmpty) { @@ -435,6 +435,43 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + test("trivial shuffle with multiple fetch failures") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduceRdd, Array(0, 1)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)))) + // The MapOutputTracker should know about both map output locations. + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === + Array("hostA", "hostB")) + + // The first result task fails, with a fetch failure for the output from the first mapper. + runEvent(CompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), + null, + Map[Long, Any](), + null, + null)) + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(sparkListener.failedStages.contains(0)) + + // The second ResultTask fails, with a fetch failure for the output from the second mapper. + runEvent(CompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1), + null, + Map[Long, Any](), + null, + null)) + // The SparkListener should not receive redundant failure events. + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(sparkListener.failedStages.size == 1) + } + test("ignore late map task completions") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) From 49282b3575df62717f4795b66925fb5099079d63 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 26 Aug 2014 19:10:03 -0700 Subject: [PATCH 4/5] Kay's feedback. --- .../apache/spark/scheduler/DAGScheduler.scala | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 44744a6d4e596..9a6e1b8bcc579 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1049,27 +1049,25 @@ class DAGScheduler( val mapStage = shuffleToMapStage(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is possible - // the fetch failure has already been handled by the executor. + // the fetch failure has already been handled by the scheduler. if (runningStages.contains(failedStage)) { markStageAsFinished(failedStage, Some("Fetch failure")) runningStages -= failedStage // TODO: Cancel running tasks in the stage - logInfo("Marking " + failedStage + " (" + failedStage.name + - ") for resubmision due to a fetch failure") - - logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + - "); marking it for resubmission") - if (failedStages.isEmpty && eventProcessActor != null) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. eventProcessActor may be - // null during unit tests. - import env.actorSystem.dispatcher - env.actorSystem.scheduler.scheduleOnce( - RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) - } - failedStages += failedStage - failedStages += mapStage + logInfo(s"Marking $failedStage (${failedStage.name}) for resubmision " + + s"due to a fetch failure from $mapStage (${mapStage.name}") + } + + if (failedStages.isEmpty && eventProcessActor != null) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. eventProcessActor may be + // null during unit tests. + import env.actorSystem.dispatcher + env.actorSystem.scheduler.scheduleOnce( + RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) } + failedStages += failedStage + failedStages += mapStage // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { From effb1ce35857d04b03485dc37ff8064e835d4911 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 26 Aug 2014 21:55:44 -0700 Subject: [PATCH 5/5] Move log message. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9a6e1b8bcc579..5a7061b517b6b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1046,22 +1046,22 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => val failedStage = stageIdToStage(task.stageId) - val mapStage = shuffleToMapStage(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is possible // the fetch failure has already been handled by the scheduler. if (runningStages.contains(failedStage)) { markStageAsFinished(failedStage, Some("Fetch failure")) runningStages -= failedStage - // TODO: Cancel running tasks in the stage - logInfo(s"Marking $failedStage (${failedStage.name}) for resubmision " + - s"due to a fetch failure from $mapStage (${mapStage.name}") } + val mapStage = shuffleToMapStage(shuffleId) if (failedStages.isEmpty && eventProcessActor != null) { // Don't schedule an event to resubmit failed stages if failed isn't empty, because // in that case the event will already have been scheduled. eventProcessActor may be // null during unit tests. + // TODO: Cancel running tasks in the stage + logInfo(s"Marking $failedStage (${failedStage.name}) for resubmision " + + s"due to a fetch failure from $mapStage (${mapStage.name}") import env.actorSystem.dispatcher env.actorSystem.scheduler.scheduleOnce( RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)