From 6e6f748561c5c3b8fa08796d39553b83216a2f29 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 21 Jan 2015 10:24:41 -0800 Subject: [PATCH 01/19] [SPARK-4879] Use the Spark driver to authorize Hadoop commits. Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is turned on, sometimes this can result in multiple tasks committing their output to the same partition. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well. This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see the aforementioned JIRA ticket. In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing. This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state. --- .../scala/org/apache/spark/SparkEnv.scala | 10 +- .../org/apache/spark/SparkHadoopWriter.scala | 31 +++- .../apache/spark/scheduler/DAGScheduler.scala | 13 +- .../spark/scheduler/DAGSchedulerEvent.scala | 1 + .../scheduler/OutputCommitCoordinator.scala | 173 ++++++++++++++++++ 5 files changed, 215 insertions(+), 13 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 4d418037bd33f..651b2cbdfcca0 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -34,7 +34,7 @@ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.network.nio.NioBlockTransferService -import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus} import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager} import org.apache.spark.storage._ @@ -67,6 +67,7 @@ class SparkEnv ( val sparkFilesDir: String, val metricsSystem: MetricsSystem, val shuffleMemoryManager: ShuffleMemoryManager, + val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging { private[spark] var isStopped = false @@ -86,6 +87,7 @@ class SparkEnv ( blockManager.stop() blockManager.master.stop() metricsSystem.stop() + outputCommitCoordinator.stop() actorSystem.shutdown() // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut // down, but let's call it anyway in case it gets fixed in a later release @@ -346,6 +348,11 @@ object SparkEnv extends Logging { "levels using the RDD.persist() method instead.") } + val outputCommitCoordinator = new OutputCommitCoordinator + val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator", + OutputCommitCoordinator.createActor(outputCommitCoordinator)) + outputCommitCoordinator.coordinatorActor = outputCommitCoordinatorActor + new SparkEnv( executorId, actorSystem, @@ -362,6 +369,7 @@ object SparkEnv extends Logging { sparkFilesDir, metricsSystem, shuffleMemoryManager, + outputCommitCoordinator, conf) } diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 40237596570de..f238a17b464db 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.HadoopRDD +import org.apache.spark.util.AkkaUtils /** * Internal helper class that saves an RDD using a Hadoop OutputFormat. @@ -44,6 +45,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf) private val now = new Date() private val conf = new SerializableWritable(jobConf) + private val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + private var jobID = 0 private var splitID = 0 private var attemptID = 0 @@ -106,18 +109,28 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { - cmtr.commitTask(taCtxt) - logInfo (taID + ": Committed") - } catch { - case e: IOException => { - logError("Error committing the output of task: " + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val conf = SparkEnv.get.conf + val timeout = AkkaUtils.askTimeout(conf) + val maxAttempts = AkkaUtils.numRetries(conf) + val retryInterval = AkkaUtils.retryWaitMs(conf) + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID, + maxAttempts, retryInterval, timeout) + if (canCommit) { + try { + cmtr.commitTask(taCtxt) + logInfo (s"$taID: Committed") + } catch { + case e: IOException => { + logError("Error committing the output of task: " + taID.value, e) + cmtr.abortTask(taCtxt) + throw e + } } + } else { + logInfo(s"$taID: Not committed because DAGScheduler did not authorize commit") } } else { - logInfo ("No need to commit output of task: " + taID.value) + logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1cfe98673773a..165c0a8f17c4d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage._ -import org.apache.spark.util.{CallSite, EventLoop, SystemClock, Clock, Utils} +import org.apache.spark.util._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat /** @@ -63,7 +63,7 @@ class DAGScheduler( mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, - clock: Clock = SystemClock) + clock: org.apache.spark.util.Clock = SystemClock) extends Logging { def this(sc: SparkContext, taskScheduler: TaskScheduler) = { @@ -113,7 +113,6 @@ class DAGScheduler( // This is only safe because DAGScheduler runs in a single thread. private val closureSerializer = SparkEnv.get.closureSerializer.newInstance() - /** If enabled, we may run certain actions like take() and first() locally. */ private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false) @@ -126,6 +125,8 @@ class DAGScheduler( private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) taskScheduler.setDAGScheduler(this) + private val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { eventProcessLoop.post(BeginEvent(task, taskInfo)) @@ -808,6 +809,7 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) + outputCommitCoordinator.stageStart(stage.id) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. @@ -865,6 +867,7 @@ class DAGScheduler( } else { // Because we posted SparkListenerStageSubmitted earlier, we should post // SparkListenerStageCompleted here in case there are no tasks to run. + outputCommitCoordinator.stageEnd(stage.id) listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) @@ -908,6 +911,9 @@ class DAGScheduler( val task = event.task val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) + val isSuccess = event.reason == Success + + outputCommitCoordinator.taskCompleted(stageId, task.partitionId, event.taskInfo.taskId, isSuccess) // The success case is dealt with separately below, since we need to compute accumulator // updates before posting. @@ -1367,6 +1373,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case ResubmitFailedStages => dagScheduler.resubmitFailedStages() + } override def onError(e: Throwable): Unit = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 2b6f7e4205c32..b3ae465b03210 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -77,3 +77,4 @@ private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent + diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala new file mode 100644 index 0000000000000..cae7f38ac6e52 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import akka.actor.{PoisonPill, ActorRef, Actor} +import org.apache.spark.Logging +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( + stage: Int, + task: Long, + taskAttempt: Long) + extends OutputCommitCoordinationMessage + +private[spark] case class TaskCompleted( + stage: Int, + task: Long, + attempt: Long, + successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator extends Logging { + + // Initialized by SparkEnv + var coordinatorActor: ActorRef = _ + + // TODO: handling stage attempt ids? + private type StageId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + private val authorizedCommittersByStage: + mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() + + def stageStart(stage: StageId) { + coordinatorActor ! StageStarted(stage) + } + def stageEnd(stage: StageId) { + coordinatorActor ! StageEnded(stage) + } + + def canCommit( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId, + timeout: FiniteDuration): Boolean = { + AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt), + coordinatorActor, timeout) + } + + def canCommit( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId, + maxAttempts: Int, + retryInterval: Int, + timeout: FiniteDuration): Boolean = { + AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt), + coordinatorActor, maxAttempts = maxAttempts, retryInterval, timeout) + } + + def taskCompleted( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId, + successful: Boolean) { + coordinatorActor ! TaskCompleted(stage, task, attempt, successful) + } + + def stop() { + coordinatorActor ! PoisonPill + } + + private def handleStageStart(stage: StageId): Unit = { + // TODO: assert that we're not overwriting an existing entry? + authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]() + } + + private def handleStageEnd(stage: StageId): Unit = { + authorizedCommittersByStage.remove(stage) + } + + private def handleAskPermissionToCommit( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId): + Boolean = { + if (!authorizedCommittersByStage.contains(stage)) { + logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit") + return false + } + val authorizedCommitters = authorizedCommittersByStage(stage) + if (authorizedCommitters.contains(task)) { + val existingCommitter = authorizedCommitters(task) + logDebug(s"Denying $attempt to commit for stage=$stage, task=$task; " + + s"existingCommitter = $existingCommitter") + false + } else { + logDebug(s"Authorizing $attempt to commit for stage=$stage, task=$task") + authorizedCommitters(task) = attempt + true + } + } + + private def handleTaskCompletion( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId, + successful: Boolean): Unit = { + if (!authorizedCommittersByStage.contains(stage)) { + logDebug(s"Ignoring task completion for completed stage") + return + } + val authorizedCommitters = authorizedCommittersByStage(stage) + if (authorizedCommitters.get(task) == Some(attempt) && !successful) { + logDebug(s"Authorized committer $attempt (stage=$stage, task=$task) failed; clearing lock") + // The authorized committer failed; clear the lock so future attempts can commit their output + authorizedCommitters.remove(task) + } + } +} + +private[spark] object OutputCommitCoordinator { + + class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator) + extends Actor with ActorLogReceive with Logging { + + override def receiveWithLogging = { + case StageStarted(stage) => + outputCommitCoordinator.handleStageStart(stage) + case StageEnded(stage) => + outputCommitCoordinator.handleStageEnd(stage) + case AskPermissionToCommitOutput(stage, task, taskAttempt) => + sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, task, taskAttempt) + case TaskCompleted(stage, task, attempt, successful) => + outputCommitCoordinator.handleTaskCompletion(stage, task, attempt, successful) + } + } + def createActor(coordinator: OutputCommitCoordinator): OutputCommitCoordinatorActor = { + new OutputCommitCoordinatorActor(coordinator) + } +} + + From bc80770fea44474aba5e1ece57968e7c8c311f85 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 21 Jan 2015 17:31:39 -0800 Subject: [PATCH 02/19] Unit tests for OutputCommitCoordinator --- .../scheduler/OutputCommitCoordinator.scala | 7 +- ...AGSchedulerSingleThreadedProcessLoop.scala | 33 ++++ .../spark/scheduler/DAGSchedulerSuite.scala | 18 +- .../OutputCommitCoordinatorSuite.scala | 180 ++++++++++++++++++ 4 files changed, 219 insertions(+), 19 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSingleThreadedProcessLoop.scala create mode 100644 core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index cae7f38ac6e52..d6726bcb9a71a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -17,13 +17,14 @@ package org.apache.spark.scheduler +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration + import akka.actor.{PoisonPill, ActorRef, Actor} + import org.apache.spark.Logging import org.apache.spark.util.{AkkaUtils, ActorLogReceive} -import scala.collection.mutable -import scala.concurrent.duration.FiniteDuration - private[spark] sealed trait OutputCommitCoordinationMessage private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSingleThreadedProcessLoop.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSingleThreadedProcessLoop.scala new file mode 100644 index 0000000000000..3df2146a449f0 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSingleThreadedProcessLoop.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.util.control.NonFatal + +class DAGSchedulerSingleThreadedProcessLoop(dagScheduler: DAGScheduler) + extends DAGSchedulerEventProcessLoop(dagScheduler) { + + override def post(event: DAGSchedulerEvent): Unit = { + try { + // Forward event to `onReceive` directly to avoid processing event asynchronously. + onReceive(event) + } catch { + case NonFatal(e) => onError(e) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index eb116213f69fc..2c67eba700a66 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.scheduler import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map} import scala.language.reflectiveCalls -import scala.util.control.NonFatal import org.scalatest.{BeforeAndAfter, FunSuiteLike} import org.scalatest.concurrent.Timeouts @@ -32,19 +31,6 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.util.CallSite import org.apache.spark.executor.TaskMetrics -class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) - extends DAGSchedulerEventProcessLoop(dagScheduler) { - - override def post(event: DAGSchedulerEvent): Unit = { - try { - // Forward event to `onReceive` directly to avoid processing event asynchronously. - onReceive(event) - } catch { - case NonFatal(e) => onError(e) - } - } -} - /** * An RDD for passing to DAGScheduler. These RDDs will use the dependencies and * preferredLocations (if any) that are passed to them. They are deliberately not executable @@ -171,7 +157,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar runLocallyWithinThread(job) } } - dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) + dagEventProcessLoopTester = new DAGSchedulerSingleThreadedProcessLoop(scheduler) } override def afterAll() { @@ -399,7 +385,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar runLocallyWithinThread(job) } } - dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(noKillScheduler) + dagEventProcessLoopTester = new DAGSchedulerSingleThreadedProcessLoop(noKillScheduler) val jobId = submit(new MyRDD(sc, 1, Nil), Array(0)) cancel(jobId) // Because the job wasn't actually cancelled, we shouldn't have received a failure message. diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala new file mode 100644 index 0000000000000..8d068bac2a279 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} + +import scala.collection.mutable + +import org.scalatest.concurrent.Timeouts +import org.scalatest.{BeforeAndAfter, FunSuiteLike} + +import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, TaskAttemptContext, OutputCommitter} +import org.mockito.Mockito._ + +import org.apache.spark._ +import org.apache.spark.executor.{TaskMetrics} +import org.apache.spark.rdd.FakeOutputCommitter + +/** + * Unit tests for the output commit coordination functionality. Overrides the SchedulerImpl + * to just run the tasks directly and send completion or error messages back to the + * DAG scheduler. + */ +class OutputCommitCoordinatorSuite + extends FunSuiteLike + with BeforeAndAfter + with LocalSparkContext + with Timeouts { + + val conf = new SparkConf().set("spark.localExecution.enabled", "true") + + var taskScheduler: TaskSchedulerImpl = null + var dagScheduler: DAGScheduler = null + var dagSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop = null + var accum: Accumulator[Int] = null + var accumId: Long = 0 + + before { + sc = new SparkContext("local", "Output Commit Coordinator Suite") + accum = sc.accumulator[Int](0) + Accumulators.register(accum, true) + accumId = accum.id + + taskScheduler = new TaskSchedulerImpl(sc, 4, true) { + override def submitTasks(taskSet: TaskSet) { + // Instead of submitting a task to some executor, just run the task directly. + // Make two attempts. The first may or may not succeed. If the first + // succeeds then the second is redundant and should be handled + // accordingly by OutputCommitCoordinator. Otherwise the second + // should not be blocked from succeeding. + execTasks(taskSet, 0) + execTasks(taskSet, 1) + } + + private def execTasks(taskSet: TaskSet, attemptNumber: Int) { + var taskIndex = 0 + taskSet.tasks.foreach(t => { + val tid = newTaskId + val taskInfo = new TaskInfo(tid, taskIndex, 0, System.currentTimeMillis, "0", + "localhost", TaskLocality.NODE_LOCAL, false) + taskIndex += 1 + // Track the successful commits in an accumulator. However, we can't just invoke + // accum += 1 since this unit test circumvents the usual accumulator updating + // infrastructure. So just send the accumulator update manually. + val accumUpdates = new mutable.HashMap[Long, Any] + try { + accumUpdates(accumId) = t.run(attemptNumber, attemptNumber) + dagSchedulerEventProcessLoop.post( + new CompletionEvent(t, Success, 0, accumUpdates, taskInfo, new TaskMetrics)) + } catch { + case e: Throwable => + dagSchedulerEventProcessLoop.post(new CompletionEvent(t, new ExceptionFailure(e, + Option.empty[TaskMetrics]), 1, accumUpdates, taskInfo, new TaskMetrics)) + } + }) + } + } + + dagScheduler = new DAGScheduler(sc, taskScheduler) + taskScheduler.setDAGScheduler(dagScheduler) + sc.dagScheduler = dagScheduler + dagSchedulerEventProcessLoop = new DAGSchedulerSingleThreadedProcessLoop(dagScheduler) + } + + /** + * Function that constructs a SparkHadoopWriter with a mock committer and runs its commit + */ + private class OutputCommittingFunctionWithAccumulator(var accum: Accumulator[Int]) + extends ((TaskContext, Iterator[Int]) => Int) with Serializable { + + def apply(ctxt: TaskContext, it: Iterator[Int]): Int = { + val outputCommitter = new FakeOutputCommitter { + override def commitTask(taskAttemptContext: TaskAttemptContext) { + super.commitTask(taskAttemptContext) + } + } + runCommitWithProvidedCommitter(ctxt, it, outputCommitter) + } + + protected def runCommitWithProvidedCommitter( + ctxt: TaskContext, + it: Iterator[Int], + outputCommitter: OutputCommitter): Int = { + def jobConf = new JobConf { + override def getOutputCommitter(): OutputCommitter = outputCommitter + } + val sparkHadoopWriter = new SparkHadoopWriter(jobConf) { + override def newTaskAttemptContext( + conf: JobConf, + attemptId: TaskAttemptID): TaskAttemptContext = { + mock(classOf[TaskAttemptContext]) + } + } + sparkHadoopWriter.setup(ctxt.stageId, ctxt.partitionId, ctxt.attemptNumber) + sparkHadoopWriter.commit + if (FakeOutputCommitter.ran) { + FakeOutputCommitter.ran = false + 1 + } else { + 0 + } + } + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream) { + out.writeObject(accum) + } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream) { + accum = in.readObject.asInstanceOf[Accumulator[Int]] + } + } + + /** + * Function that will explicitly fail to commit on the first attempt + */ + private class FailFirstTimeCommittingFunctionWithAccumulator(accum: Accumulator[Int]) + extends OutputCommittingFunctionWithAccumulator(accum) { + override def apply(ctxt: TaskContext, it: Iterator[Int]): Int = { + if (ctxt.attemptNumber == 0) { + val outputCommitter = new FakeOutputCommitter { + override def commitTask(taskAttemptContext: TaskAttemptContext) { + throw new RuntimeException + } + } + runCommitWithProvidedCommitter(ctxt, it, outputCommitter) + } else { + super.apply(ctxt, it) + } + } + } + + test("Only one of two duplicate commit tasks should commit") { + val rdd = sc.parallelize(1 to 10, 10) + sc.runJob(rdd, new OutputCommittingFunctionWithAccumulator(accum)) + assert(accum.value === 10) + } + + test("If commit fails, if task is retried it should not be locked, and will succeed.") { + val rdd = sc.parallelize(Seq(1), 1) + sc.runJob(rdd, new FailFirstTimeCommittingFunctionWithAccumulator(accum)) + assert(accum.value == 1) + } +} From c9decc68377db246e720afe802a91852b043e364 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 21 Jan 2015 17:34:05 -0800 Subject: [PATCH 03/19] Scalastyle fixes --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- .../apache/spark/scheduler/OutputCommitCoordinatorSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 165c0a8f17c4d..839ef7275f2ce 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -913,7 +913,8 @@ class DAGScheduler( val taskType = Utils.getFormattedClassName(task) val isSuccess = event.reason == Success - outputCommitCoordinator.taskCompleted(stageId, task.partitionId, event.taskInfo.taskId, isSuccess) + outputCommitCoordinator.taskCompleted(stageId, task.partitionId, + event.taskInfo.taskId, isSuccess) // The success case is dealt with separately below, since we need to compute accumulator // updates before posting. diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 8d068bac2a279..10e903bc804ae 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -81,11 +81,11 @@ class OutputCommitCoordinatorSuite try { accumUpdates(accumId) = t.run(attemptNumber, attemptNumber) dagSchedulerEventProcessLoop.post( - new CompletionEvent(t, Success, 0, accumUpdates, taskInfo, new TaskMetrics)) + new CompletionEvent(t, Success, 0, accumUpdates, taskInfo, new TaskMetrics)) } catch { case e: Throwable => dagSchedulerEventProcessLoop.post(new CompletionEvent(t, new ExceptionFailure(e, - Option.empty[TaskMetrics]), 1, accumUpdates, taskInfo, new TaskMetrics)) + Option.empty[TaskMetrics]), 1, accumUpdates, taskInfo, new TaskMetrics)) } }) } From 6b543bae5d9a6ea27408a57663986d3947ea7ab1 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 21 Jan 2015 17:52:14 -0800 Subject: [PATCH 04/19] Removing redundant accumulator in unit test --- .../spark/scheduler/OutputCommitCoordinatorSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 10e903bc804ae..8d0da9fc78fdd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -100,7 +100,7 @@ class OutputCommitCoordinatorSuite /** * Function that constructs a SparkHadoopWriter with a mock committer and runs its commit */ - private class OutputCommittingFunctionWithAccumulator(var accum: Accumulator[Int]) + private class OutputCommittingFunctionWithAccumulator extends ((TaskContext, Iterator[Int]) => Int) with Serializable { def apply(ctxt: TaskContext, it: Iterator[Int]): Int = { @@ -150,8 +150,8 @@ class OutputCommitCoordinatorSuite /** * Function that will explicitly fail to commit on the first attempt */ - private class FailFirstTimeCommittingFunctionWithAccumulator(accum: Accumulator[Int]) - extends OutputCommittingFunctionWithAccumulator(accum) { + private class FailFirstTimeCommittingFunctionWithAccumulator + extends OutputCommittingFunctionWithAccumulator { override def apply(ctxt: TaskContext, it: Iterator[Int]): Int = { if (ctxt.attemptNumber == 0) { val outputCommitter = new FakeOutputCommitter { @@ -168,13 +168,13 @@ class OutputCommitCoordinatorSuite test("Only one of two duplicate commit tasks should commit") { val rdd = sc.parallelize(1 to 10, 10) - sc.runJob(rdd, new OutputCommittingFunctionWithAccumulator(accum)) + sc.runJob(rdd, new OutputCommittingFunctionWithAccumulator) assert(accum.value === 10) } test("If commit fails, if task is retried it should not be locked, and will succeed.") { val rdd = sc.parallelize(Seq(1), 1) - sc.runJob(rdd, new FailFirstTimeCommittingFunctionWithAccumulator(accum)) + sc.runJob(rdd, new FailFirstTimeCommittingFunctionWithAccumulator) assert(accum.value == 1) } } From 66a71cd85c7e0ddf843b2318058c9dceb9e2c0c3 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 21 Jan 2015 18:25:33 -0800 Subject: [PATCH 05/19] Removing whitespace modifications --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/scheduler/DAGSchedulerEvent.scala | 1 - .../spark/scheduler/OutputCommitCoordinatorSuite.scala | 6 +++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 839ef7275f2ce..d6cc6c04004bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -113,6 +113,7 @@ class DAGScheduler( // This is only safe because DAGScheduler runs in a single thread. private val closureSerializer = SparkEnv.get.closureSerializer.newInstance() + /** If enabled, we may run certain actions like take() and first() locally. */ private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false) @@ -1374,7 +1375,6 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case ResubmitFailedStages => dagScheduler.resubmitFailedStages() - } override def onError(e: Throwable): Unit = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index b3ae465b03210..2b6f7e4205c32 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -77,4 +77,3 @@ private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent - diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 8d0da9fc78fdd..d4aaa35932dcb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -32,9 +32,9 @@ import org.apache.spark.executor.{TaskMetrics} import org.apache.spark.rdd.FakeOutputCommitter /** - * Unit tests for the output commit coordination functionality. Overrides the SchedulerImpl - * to just run the tasks directly and send completion or error messages back to the - * DAG scheduler. + * Unit tests for the output commit coordination functionality. Overrides the + * SchedulerImpl to just run the tasks directly and send completion or error + * messages back to the DAG scheduler. */ class OutputCommitCoordinatorSuite extends FunSuiteLike From 1c2b21908225a644f268adb8e1e67a4479492532 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 21 Jan 2015 18:27:14 -0800 Subject: [PATCH 06/19] Renaming oudated names for test function classes --- .../spark/scheduler/OutputCommitCoordinatorSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index d4aaa35932dcb..c038ce3b65c52 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -100,7 +100,7 @@ class OutputCommitCoordinatorSuite /** * Function that constructs a SparkHadoopWriter with a mock committer and runs its commit */ - private class OutputCommittingFunctionWithAccumulator + private class OutputCommittingFunction extends ((TaskContext, Iterator[Int]) => Int) with Serializable { def apply(ctxt: TaskContext, it: Iterator[Int]): Int = { @@ -150,8 +150,8 @@ class OutputCommitCoordinatorSuite /** * Function that will explicitly fail to commit on the first attempt */ - private class FailFirstTimeCommittingFunctionWithAccumulator - extends OutputCommittingFunctionWithAccumulator { + private class FailFirstTimeCommittingFunction + extends OutputCommittingFunction { override def apply(ctxt: TaskContext, it: Iterator[Int]): Int = { if (ctxt.attemptNumber == 0) { val outputCommitter = new FakeOutputCommitter { @@ -168,13 +168,13 @@ class OutputCommitCoordinatorSuite test("Only one of two duplicate commit tasks should commit") { val rdd = sc.parallelize(1 to 10, 10) - sc.runJob(rdd, new OutputCommittingFunctionWithAccumulator) + sc.runJob(rdd, new OutputCommittingFunction) assert(accum.value === 10) } test("If commit fails, if task is retried it should not be locked, and will succeed.") { val rdd = sc.parallelize(Seq(1), 1) - sc.runJob(rdd, new FailFirstTimeCommittingFunctionWithAccumulator) + sc.runJob(rdd, new FailFirstTimeCommittingFunction) assert(accum.value == 1) } } From f135a8ec6744e555a5d5d1ee1fdc71acf8d451d7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 22 Jan 2015 09:49:08 -0800 Subject: [PATCH 07/19] Moving the output commit coordinator from class into method. The SparkHadoopWriter wasn't serializable because the commit coordinator was a class member. It doesn't need to be, so just get it on demand when committing the task. --- core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index f238a17b464db..a4d13c972c01a 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -45,8 +45,6 @@ class SparkHadoopWriter(@transient jobConf: JobConf) private val now = new Date() private val conf = new SerializableWritable(jobConf) - private val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator - private var jobID = 0 private var splitID = 0 private var attemptID = 0 @@ -109,6 +107,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val conf = SparkEnv.get.conf val timeout = AkkaUtils.askTimeout(conf) val maxAttempts = AkkaUtils.numRetries(conf) From abc7db44c145e10d6a26d71c63c3f541bab6d274 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 22 Jan 2015 12:47:28 -0800 Subject: [PATCH 08/19] TaskInfo can't be null in DAGSchedulerSuite But we don't care about what's actually in the TaskInfo object - testing OutputCommitCoordinator is done in OutputCommitCoordinatorSuite which is the only code path that relies on the TaskInfo's fields. Nevertheless we need the Task Info to not be null to allow that code path to not throw an NPE. --- .../spark/scheduler/DAGSchedulerSuite.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2c67eba700a66..a54b02a7d5dd7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -24,6 +24,8 @@ import org.scalatest.{BeforeAndAfter, FunSuiteLike} import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ +import org.mockito.Mockito.mock + import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode @@ -194,7 +196,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar assert(taskSet.tasks.size >= results.size) for ((result, i) <- results.zipWithIndex) { if (i < taskSet.tasks.size) { - runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null)) + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo, null)) } } } @@ -205,7 +207,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar for ((result, i) <- results.zipWithIndex) { if (i < taskSet.tasks.size) { runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, - Map[Long, Any]((accumId, 1)), null, null)) + Map[Long, Any]((accumId, 1)), createFakeTaskInfo, null)) } } } @@ -462,7 +464,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null, Map[Long, Any](), - null, + createFakeTaskInfo, null)) assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(sparkListener.failedStages.contains(1)) @@ -473,7 +475,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"), null, Map[Long, Any](), - null, + createFakeTaskInfo, null)) // The SparkListener should not receive redundant failure events. assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) @@ -493,14 +495,14 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar assert(newEpoch > oldEpoch) val taskSet = taskSets(0) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo, null)) // should work because it's a non-failed host - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo, null)) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo, null)) // should work because it's a new epoch taskSet.tasks(1).epoch = newEpoch - runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null)) + runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo, null)) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) @@ -752,5 +754,10 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar assert(scheduler.shuffleToMapStage.isEmpty) assert(scheduler.waitingStages.isEmpty) } + + // Nothing in this test should break if the task info's fields are null, but + // OutputCommitCoordinator requires the task info itself to not be null. + private def createFakeTaskInfo(): TaskInfo = mock(classOf[TaskInfo]) + } From 83de900eb60125e1190670e29938fc0b8634bdeb Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 22 Jan 2015 16:07:00 -0800 Subject: [PATCH 09/19] Making the OutputCommitCoordinatorMessage serializable --- .../org/apache/spark/scheduler/OutputCommitCoordinator.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index d6726bcb9a71a..ee20896de1449 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -25,7 +25,7 @@ import akka.actor.{PoisonPill, ActorRef, Actor} import org.apache.spark.Logging import org.apache.spark.util.{AkkaUtils, ActorLogReceive} -private[spark] sealed trait OutputCommitCoordinationMessage +private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage @@ -54,7 +54,6 @@ private[spark] class OutputCommitCoordinator extends Logging { // Initialized by SparkEnv var coordinatorActor: ActorRef = _ - // TODO: handling stage attempt ids? private type StageId = Int private type TaskId = Long private type TaskAttemptId = Long @@ -62,6 +61,7 @@ private[spark] class OutputCommitCoordinator extends Logging { private val authorizedCommittersByStage: mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() + def stageStart(stage: StageId) { coordinatorActor ! StageStarted(stage) } @@ -102,7 +102,6 @@ private[spark] class OutputCommitCoordinator extends Logging { } private def handleStageStart(stage: StageId): Unit = { - // TODO: assert that we're not overwriting an existing entry? authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]() } From 78eb1b58cee0b5baf532ffde2a90e0fc31657d69 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 22 Jan 2015 17:15:54 -0800 Subject: [PATCH 10/19] Better OutputCommitCoordinatorActor stopping; simpler canCommit Before, a poison pill message was sent to the actor. That is not the paradigm that is used on other actors in Spark though, so making this more like the e.g. MapOutputTracker Actor. --- .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../org/apache/spark/SparkHadoopWriter.scala | 6 +-- .../scheduler/OutputCommitCoordinator.scala | 37 +++++++++---------- 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 651b2cbdfcca0..0c05daa70789a 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -348,7 +348,7 @@ object SparkEnv extends Logging { "levels using the RDD.persist() method instead.") } - val outputCommitCoordinator = new OutputCommitCoordinator + val outputCommitCoordinator = new OutputCommitCoordinator(conf) val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator", OutputCommitCoordinator.createActor(outputCommitCoordinator)) outputCommitCoordinator.coordinatorActor = outputCommitCoordinatorActor diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index a4d13c972c01a..bc5c7c5509d89 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -109,11 +109,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) if (cmtr.needsTaskCommit(taCtxt)) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val conf = SparkEnv.get.conf - val timeout = AkkaUtils.askTimeout(conf) - val maxAttempts = AkkaUtils.numRetries(conf) - val retryInterval = AkkaUtils.retryWaitMs(conf) - val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID, - maxAttempts, retryInterval, timeout) + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) if (canCommit) { try { cmtr.commitTask(taCtxt) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index ee20896de1449..a91f953cfcb5e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -22,13 +22,14 @@ import scala.concurrent.duration.FiniteDuration import akka.actor.{PoisonPill, ActorRef, Actor} -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.util.{AkkaUtils, ActorLogReceive} private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage private[spark] case class AskPermissionToCommitOutput( stage: Int, @@ -49,10 +50,13 @@ private[spark] case class TaskCompleted( * This lives on the driver, but the actor allows the tasks that commit * to Hadoop to invoke it. */ -private[spark] class OutputCommitCoordinator extends Logging { +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { // Initialized by SparkEnv var coordinatorActor: ActorRef = _ + val timeout = AkkaUtils.askTimeout(conf) + val maxAttempts = AkkaUtils.numRetries(conf) + val retryInterval = AkkaUtils.retryWaitMs(conf) private type StageId = Int private type TaskId = Long @@ -61,7 +65,6 @@ private[spark] class OutputCommitCoordinator extends Logging { private val authorizedCommittersByStage: mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() - def stageStart(stage: StageId) { coordinatorActor ! StageStarted(stage) } @@ -72,21 +75,9 @@ private[spark] class OutputCommitCoordinator extends Logging { def canCommit( stage: StageId, task: TaskId, - attempt: TaskAttemptId, - timeout: FiniteDuration): Boolean = { + attempt: TaskAttemptId): Boolean = { AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt), - coordinatorActor, timeout) - } - - def canCommit( - stage: StageId, - task: TaskId, - attempt: TaskAttemptId, - maxAttempts: Int, - retryInterval: Int, - timeout: FiniteDuration): Boolean = { - AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt), - coordinatorActor, maxAttempts = maxAttempts, retryInterval, timeout) + coordinatorActor, maxAttempts, retryInterval, timeout) } def taskCompleted( @@ -98,7 +89,10 @@ private[spark] class OutputCommitCoordinator extends Logging { } def stop() { - coordinatorActor ! PoisonPill + val stopped = AkkaUtils.askWithReply[Boolean](StopCoordinator, coordinatorActor, timeout) + if (!stopped) { + logWarning("Expected true from stopping output coordinator actor, but got false!") + } } private def handleStageStart(stage: StageId): Unit = { @@ -147,6 +141,7 @@ private[spark] class OutputCommitCoordinator extends Logging { authorizedCommitters.remove(task) } } + } private[spark] object OutputCommitCoordinator { @@ -163,11 +158,13 @@ private[spark] object OutputCommitCoordinator { sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, task, taskAttempt) case TaskCompleted(stage, task, attempt, successful) => outputCommitCoordinator.handleTaskCompletion(stage, task, attempt, successful) + case StopCoordinator => + logInfo("OutputCommitCoordinator stopped!") + context.stop(self) + sender ! true } } def createActor(coordinator: OutputCommitCoordinator): OutputCommitCoordinatorActor = { new OutputCommitCoordinatorActor(coordinator) } } - - From 9c6a4fa7be88428a0de0825d0c77bd990c6a1f2f Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 22 Jan 2015 17:37:23 -0800 Subject: [PATCH 11/19] More OutputCommitCoordinator cleanup on stop() --- .../org/apache/spark/scheduler/OutputCommitCoordinator.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index a91f953cfcb5e..a2ff32d68ae99 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -93,6 +93,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { if (!stopped) { logWarning("Expected true from stopping output coordinator actor, but got false!") } + authorizedCommittersByStage.foreach(_._2.clear) + authorizedCommittersByStage.clear } private def handleStageStart(stage: StageId): Unit = { From 8d5a091bcb88b4376bffb0e16da9e388e7e79afe Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 22 Jan 2015 18:14:07 -0800 Subject: [PATCH 12/19] Was mistakenly serializing the accumulator in test suite. --- .../spark/scheduler/OutputCommitCoordinatorSuite.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index c038ce3b65c52..d8383f14d60bf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -137,14 +137,10 @@ class OutputCommitCoordinatorSuite } @throws(classOf[IOException]) - private def writeObject(out: ObjectOutputStream) { - out.writeObject(accum) - } + private def writeObject(out: ObjectOutputStream) {} @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream) { - accum = in.readObject.asInstanceOf[Accumulator[Int]] - } + private def readObject(in: ObjectInputStream) {} } /** From c3342552e03d690ac4beea939b5abd13363698c4 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 22 Jan 2015 23:14:18 -0800 Subject: [PATCH 13/19] Properly handling messages that could be sent after actor shutdown. --- .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../scheduler/OutputCommitCoordinator.scala | 32 +++++++++++-------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 0c05daa70789a..379b724ccb9a4 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -351,7 +351,7 @@ object SparkEnv extends Logging { val outputCommitCoordinator = new OutputCommitCoordinator(conf) val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator", OutputCommitCoordinator.createActor(outputCommitCoordinator)) - outputCommitCoordinator.coordinatorActor = outputCommitCoordinatorActor + outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor) new SparkEnv( executorId, diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index a2ff32d68ae99..a5d7ccf4a455f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -53,10 +53,10 @@ private[spark] case class TaskCompleted( private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { // Initialized by SparkEnv - var coordinatorActor: ActorRef = _ - val timeout = AkkaUtils.askTimeout(conf) - val maxAttempts = AkkaUtils.numRetries(conf) - val retryInterval = AkkaUtils.retryWaitMs(conf) + var coordinatorActor: Option[ActorRef] = None + private val timeout = AkkaUtils.askTimeout(conf) + private val maxAttempts = AkkaUtils.numRetries(conf) + private val retryInterval = AkkaUtils.retryWaitMs(conf) private type StageId = Int private type TaskId = Long @@ -66,18 +66,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() def stageStart(stage: StageId) { - coordinatorActor ! StageStarted(stage) + sendToActor(StageStarted(stage)) } def stageEnd(stage: StageId) { - coordinatorActor ! StageEnded(stage) + sendToActor(StageEnded(stage)) } def canCommit( stage: StageId, task: TaskId, attempt: TaskAttemptId): Boolean = { - AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt), - coordinatorActor, maxAttempts, retryInterval, timeout) + askActor(AskPermissionToCommitOutput(stage, task, attempt)) } def taskCompleted( @@ -85,14 +84,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { task: TaskId, attempt: TaskAttemptId, successful: Boolean) { - coordinatorActor ! TaskCompleted(stage, task, attempt, successful) + sendToActor(TaskCompleted(stage, task, attempt, successful)) } def stop() { - val stopped = AkkaUtils.askWithReply[Boolean](StopCoordinator, coordinatorActor, timeout) - if (!stopped) { - logWarning("Expected true from stopping output coordinator actor, but got false!") - } + sendToActor(StopCoordinator) + coordinatorActor = None authorizedCommittersByStage.foreach(_._2.clear) authorizedCommittersByStage.clear } @@ -144,6 +141,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { } } + private def sendToActor(msg: OutputCommitCoordinationMessage) { + coordinatorActor.foreach(_ ! msg) + } + + private def askActor(msg: OutputCommitCoordinationMessage): Boolean = { + coordinatorActor + .map(AkkaUtils.askWithReply[Boolean](msg, _, maxAttempts, retryInterval, timeout)) + .getOrElse(false) + } } private[spark] object OutputCommitCoordinator { From d4311443b1aa652f103c46f6baae2f0d9721564f Mon Sep 17 00:00:00 2001 From: mcheah Date: Sun, 25 Jan 2015 21:15:00 -0800 Subject: [PATCH 14/19] Using more concurrency to process OutputCommitCoordinator requests. The OutputCommitCoordinator now delegates commit authorization to a thread pool. The synchronization at least allows multiple tasks that are committing to different partitions to have their requests processed concurrently. There is still a single message queue in the actor but the message processing should be fast if it is just submitting a Runnable. Also some style fixes in here. --- .../scala/org/apache/spark/SparkEnv.scala | 6 +- .../org/apache/spark/SparkHadoopWriter.scala | 4 +- .../apache/spark/scheduler/DAGScheduler.scala | 7 +- .../scheduler/OutputCommitCoordinator.scala | 196 ++++++++++++------ .../spark/scheduler/DAGSchedulerSuite.scala | 16 +- .../OutputCommitCoordinatorSuite.scala | 13 +- 6 files changed, 160 insertions(+), 82 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 379b724ccb9a4..646cf050f0e15 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -20,6 +20,8 @@ package org.apache.spark import java.io.File import java.net.Socket +import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorActor + import scala.collection.JavaConversions._ import scala.collection.mutable import scala.util.Properties @@ -350,8 +352,8 @@ object SparkEnv extends Logging { val outputCommitCoordinator = new OutputCommitCoordinator(conf) val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator", - OutputCommitCoordinator.createActor(outputCommitCoordinator)) - outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor) + new OutputCommitCoordinatorActor(outputCommitCoordinator)) + outputCommitCoordinator.initialize(outputCommitCoordinatorActor, isDriver) new SparkEnv( executorId, diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index bc5c7c5509d89..92238c4524b9c 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -108,8 +108,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator - val conf = SparkEnv.get.conf - val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + val canCommit = outputCommitCoordinator.canCommit(jobID, + taID.value.getTaskID().getId(), splitID, attemptID) if (canCommit) { try { cmtr.commitTask(taCtxt) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d6cc6c04004bc..dc6457de63713 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -810,7 +810,7 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) - outputCommitCoordinator.stageStart(stage.id) + outputCommitCoordinator.stageStart(stage.id, partitionsToCompute) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. @@ -914,8 +914,9 @@ class DAGScheduler( val taskType = Utils.getFormattedClassName(task) val isSuccess = event.reason == Success - outputCommitCoordinator.taskCompleted(stageId, task.partitionId, - event.taskInfo.taskId, isSuccess) + outputCommitCoordinator.taskCompleted(stageId, event.taskInfo.taskId, + task.partitionId, event.taskInfo.attempt, + isSuccess) // The success case is dealt with separately below, since we need to compute accumulator // updates before posting. diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index a5d7ccf4a455f..7d4b7a3d51b87 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -17,32 +17,38 @@ package org.apache.spark.scheduler -import scala.collection.mutable -import scala.concurrent.duration.FiniteDuration +import java.util.concurrent.{ExecutorService, TimeUnit, Executors, ConcurrentHashMap} -import akka.actor.{PoisonPill, ActorRef, Actor} +import scala.collection.{Map => ScalaImmutableMap} +import scala.collection.concurrent.{Map => ScalaConcurrentMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} import org.apache.spark.{SparkConf, Logging} import org.apache.spark.util.{AkkaUtils, ActorLogReceive} -private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable +private[spark] sealed trait OutputCommitCoordinationMessage -private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage private[spark] case class AskPermissionToCommitOutput( stage: Int, task: Long, + partId: Int, taskAttempt: Long) - extends OutputCommitCoordinationMessage + extends OutputCommitCoordinationMessage with Serializable private[spark] case class TaskCompleted( stage: Int, task: Long, + partId: Int, attempt: Long, successful: Boolean) - extends OutputCommitCoordinationMessage + extends OutputCommitCoordinationMessage /** * Authority that decides whether tasks can commit output to HDFS. @@ -52,96 +58,150 @@ private[spark] case class TaskCompleted( */ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + private type StageId = Int + private type PartitionId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + // Wrapper for an int option that allows it to be locked via a synchronized block + // while still setting option itself to Some(...) or None. + private class LockableAttemptId(var value: Option[TaskAttemptId]) + + private type CommittersByStageHashMap = + ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, LockableAttemptId]] + // Initialized by SparkEnv - var coordinatorActor: Option[ActorRef] = None + private var coordinatorActor: Option[ActorRef] = None private val timeout = AkkaUtils.askTimeout(conf) private val maxAttempts = AkkaUtils.numRetries(conf) private val retryInterval = AkkaUtils.retryWaitMs(conf) + private val authorizedCommittersByStage = new CommittersByStageHashMap().asScala - private type StageId = Int - private type TaskId = Long - private type TaskAttemptId = Long - - private val authorizedCommittersByStage: - mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() + private var executorRequestHandlingThreadPool: Option[ExecutorService] = None - def stageStart(stage: StageId) { - sendToActor(StageStarted(stage)) + def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = { + sendToActor(StageStarted(stage, partitionIds)) } - def stageEnd(stage: StageId) { + + def stageEnd(stage: StageId): Unit = { sendToActor(StageEnded(stage)) } def canCommit( stage: StageId, task: TaskId, + partId: PartitionId, attempt: TaskAttemptId): Boolean = { - askActor(AskPermissionToCommitOutput(stage, task, attempt)) + askActor(AskPermissionToCommitOutput(stage, task, partId, attempt)) } def taskCompleted( stage: StageId, task: TaskId, + partId: PartitionId, attempt: TaskAttemptId, - successful: Boolean) { - sendToActor(TaskCompleted(stage, task, attempt, successful)) + successful: Boolean): Unit = { + sendToActor(TaskCompleted(stage, task, partId, attempt, successful)) } - def stop() { + def stop(): Unit = { + executorRequestHandlingThreadPool.foreach { pool => + pool.shutdownNow() + pool.awaitTermination(10, TimeUnit.SECONDS) + } sendToActor(StopCoordinator) coordinatorActor = None - authorizedCommittersByStage.foreach(_._2.clear) + executorRequestHandlingThreadPool = None authorizedCommittersByStage.clear } - private def handleStageStart(stage: StageId): Unit = { - authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]() + def initialize(actor: ActorRef, isDriver: Boolean): Unit = { + coordinatorActor = Some(actor) + executorRequestHandlingThreadPool = { + if (isDriver) { + Some(Executors.newFixedThreadPool(4)) + } else { + None + } + } + } + + // Methods that mutate the internal state of the coordinator shouldn't be + // called directly, and are thus made private instead of public. The + // private methods should be called from the Actor, and callers use the + // public methods to send messages to the actor. + private def handleStageStart(stage: StageId, partitionIds: Seq[Int]): Unit = { + val initialLockStates = partitionIds.map(partId => { + partId -> new LockableAttemptId(None) + }).toMap + authorizedCommittersByStage.put(stage, initialLockStates) } private def handleStageEnd(stage: StageId): Unit = { authorizedCommittersByStage.remove(stage) } - private def handleAskPermissionToCommit( + private def determineIfCommitAllowed( stage: StageId, task: TaskId, - attempt: TaskAttemptId): - Boolean = { - if (!authorizedCommittersByStage.contains(stage)) { - logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit") - return false - } - val authorizedCommitters = authorizedCommittersByStage(stage) - if (authorizedCommitters.contains(task)) { - val existingCommitter = authorizedCommitters(task) - logDebug(s"Denying $attempt to commit for stage=$stage, task=$task; " + - s"existingCommitter = $existingCommitter") - false - } else { - logDebug(s"Authorizing $attempt to commit for stage=$stage, task=$task") - authorizedCommitters(task) = attempt - true + partId: PartitionId, + attempt: TaskAttemptId): Boolean = { + authorizedCommittersByStage.get(stage) match { + case Some(authorizedCommitters) => + val authorizedCommitMetadataForPart = authorizedCommitters(partId) + authorizedCommitMetadataForPart.synchronized { + // Don't use match - we'll be setting the value of the option in the else block + if (authorizedCommitMetadataForPart.value.isDefined) { + val existingCommitter = authorizedCommitMetadataForPart.value.get + logDebug(s"Denying $attempt to commit for stage=$stage, task=$task; " + + s"existingCommitter = $existingCommitter") + false + } else { + logDebug(s"Authorizing $attempt to commit for stage=$stage, task=$task") + authorizedCommitMetadataForPart.value = Some(attempt) + true + } + } + case None => + logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit") + false } } + private def handleAskPermissionToCommitOutput( + requester: ActorRef, + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId): Unit = { + executorRequestHandlingThreadPool.foreach(_.submit( + new AskCommitRunnable(requester, this, stage, task, partId, attempt))) + } + private def handleTaskCompletion( stage: StageId, task: TaskId, + partId: PartitionId, attempt: TaskAttemptId, successful: Boolean): Unit = { - if (!authorizedCommittersByStage.contains(stage)) { - logDebug(s"Ignoring task completion for completed stage") - return - } - val authorizedCommitters = authorizedCommittersByStage(stage) - if (authorizedCommitters.get(task) == Some(attempt) && !successful) { - logDebug(s"Authorized committer $attempt (stage=$stage, task=$task) failed; clearing lock") - // The authorized committer failed; clear the lock so future attempts can commit their output - authorizedCommitters.remove(task) + authorizedCommittersByStage.get(stage) match { + case Some(authorizedCommitters) => + val authorizedCommitMetadataForPart = authorizedCommitters(partId) + authorizedCommitMetadataForPart.synchronized { + if (authorizedCommitMetadataForPart.value == Some(attempt) && !successful) { + logDebug(s"Authorized committer $attempt (stage=$stage," + + s" task=$task) failed; clearing lock") + // The authorized committer failed; clear the lock so future attempts can + // commit their output + authorizedCommitMetadataForPart.value = None + } + } + case None => + logDebug(s"Ignoring task completion for completed stage") } } - private def sendToActor(msg: OutputCommitCoordinationMessage) { + private def sendToActor(msg: OutputCommitCoordinationMessage): Unit = { coordinatorActor.foreach(_ ! msg) } @@ -150,29 +210,43 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { .map(AkkaUtils.askWithReply[Boolean](msg, _, maxAttempts, retryInterval, timeout)) .getOrElse(false) } + + class AskCommitRunnable( + private val requester: ActorRef, + private val outputCommitCoordinator: OutputCommitCoordinator, + private val stage: StageId, + private val task: TaskId, + private val partId: PartitionId, + private val taskAttempt: TaskAttemptId) + extends Runnable { + override def run(): Unit = { + requester ! outputCommitCoordinator.determineIfCommitAllowed(stage, task, partId, taskAttempt) + } + } } private[spark] object OutputCommitCoordinator { + // Actor is defined inside the OutputCommitCoordinator object so that receiveWithLogging() + // can call the private methods, where it is safe to do so because it is in the actor event + // loop. class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator) extends Actor with ActorLogReceive with Logging { - override def receiveWithLogging = { - case StageStarted(stage) => - outputCommitCoordinator.handleStageStart(stage) + override def receiveWithLogging() = { + case StageStarted(stage, partitionIds) => + outputCommitCoordinator.handleStageStart(stage, partitionIds) case StageEnded(stage) => outputCommitCoordinator.handleStageEnd(stage) - case AskPermissionToCommitOutput(stage, task, taskAttempt) => - sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, task, taskAttempt) - case TaskCompleted(stage, task, attempt, successful) => - outputCommitCoordinator.handleTaskCompletion(stage, task, attempt, successful) + case AskPermissionToCommitOutput(stage, task, partId, taskAttempt) => + outputCommitCoordinator.handleAskPermissionToCommitOutput( + sender, stage, task, partId, taskAttempt) + case TaskCompleted(stage, task, partId, attempt, successful) => + outputCommitCoordinator.handleTaskCompletion(stage, task, partId, attempt, successful) case StopCoordinator => logInfo("OutputCommitCoordinator stopped!") context.stop(self) sender ! true } } - def createActor(coordinator: OutputCommitCoordinator): OutputCommitCoordinatorActor = { - new OutputCommitCoordinatorActor(coordinator) - } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a54b02a7d5dd7..8df46f34d59c8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -196,7 +196,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar assert(taskSet.tasks.size >= results.size) for ((result, i) <- results.zipWithIndex) { if (i < taskSet.tasks.size) { - runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo, null)) + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo(), null)) } } } @@ -207,7 +207,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar for ((result, i) <- results.zipWithIndex) { if (i < taskSet.tasks.size) { runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, - Map[Long, Any]((accumId, 1)), createFakeTaskInfo, null)) + Map[Long, Any]((accumId, 1)), createFakeTaskInfo(), null)) } } } @@ -464,7 +464,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null, Map[Long, Any](), - createFakeTaskInfo, + createFakeTaskInfo(), null)) assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(sparkListener.failedStages.contains(1)) @@ -475,7 +475,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"), null, Map[Long, Any](), - createFakeTaskInfo, + createFakeTaskInfo(), null)) // The SparkListener should not receive redundant failure events. assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) @@ -495,14 +495,14 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar assert(newEpoch > oldEpoch) val taskSet = taskSets(0) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null)) // should work because it's a non-failed host - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null)) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null)) // should work because it's a new epoch taskSet.tasks(1).epoch = newEpoch - runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo, null)) + runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null)) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index d8383f14d60bf..19371c69fce1d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -21,11 +21,11 @@ import java.io.{ObjectInputStream, ObjectOutputStream, IOException} import scala.collection.mutable +import org.mockito.Mockito._ import org.scalatest.concurrent.Timeouts -import org.scalatest.{BeforeAndAfter, FunSuiteLike} +import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, TaskAttemptContext, OutputCommitter} -import org.mockito.Mockito._ import org.apache.spark._ import org.apache.spark.executor.{TaskMetrics} @@ -37,7 +37,7 @@ import org.apache.spark.rdd.FakeOutputCommitter * messages back to the DAG scheduler. */ class OutputCommitCoordinatorSuite - extends FunSuiteLike + extends FunSuite with BeforeAndAfter with LocalSparkContext with Timeouts { @@ -69,7 +69,7 @@ class OutputCommitCoordinatorSuite private def execTasks(taskSet: TaskSet, attemptNumber: Int) { var taskIndex = 0 - taskSet.tasks.foreach(t => { + taskSet.tasks.foreach { t => val tid = newTaskId val taskInfo = new TaskInfo(tid, taskIndex, 0, System.currentTimeMillis, "0", "localhost", TaskLocality.NODE_LOCAL, false) @@ -87,7 +87,7 @@ class OutputCommitCoordinatorSuite dagSchedulerEventProcessLoop.post(new CompletionEvent(t, new ExceptionFailure(e, Option.empty[TaskMetrics]), 1, accumUpdates, taskInfo, new TaskMetrics)) } - }) + } } } @@ -136,6 +136,7 @@ class OutputCommitCoordinatorSuite } } + // Need this otherwise the entire test suite attempts to be serialized @throws(classOf[IOException]) private def writeObject(out: ObjectOutputStream) {} @@ -171,6 +172,6 @@ class OutputCommitCoordinatorSuite test("If commit fails, if task is retried it should not be locked, and will succeed.") { val rdd = sc.parallelize(Seq(1), 1) sc.runJob(rdd, new FailFirstTimeCommittingFunction) - assert(accum.value == 1) + assert(accum.value === 1) } } From 1df2a91eb39300a32ad095b37a04846d135e2cc5 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 27 Jan 2015 10:54:51 -0800 Subject: [PATCH 15/19] Throwing exception if SparkHadoopWriter commit denied However, the Executor will not treat this like any other ExceptionFailure, but rather send a specific TaskEndReason to the driver. The driver simply ignores the task that failed to commit after logging. If the task that attempted to commit first fails, it will be resubmitted. TODO unit tests, the current unit tests don't capture this workflow all the way down to the Executor level. --- .../apache/spark/CommitDeniedException.scala | 31 +++++++++++++++++++ .../org/apache/spark/SparkHadoopWriter.scala | 8 +++-- .../org/apache/spark/TaskEndReason.scala | 14 +++++++++ .../org/apache/spark/executor/Executor.scala | 5 +++ .../apache/spark/scheduler/DAGScheduler.scala | 3 ++ .../scheduler/OutputCommitCoordinator.scala | 18 +++++++---- .../spark/scheduler/TaskSetManager.scala | 5 ++- .../spark/scheduler/DAGSchedulerSuite.scala | 3 +- 8 files changed, 76 insertions(+), 11 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/CommitDeniedException.scala diff --git a/core/src/main/scala/org/apache/spark/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/CommitDeniedException.scala new file mode 100644 index 0000000000000..3b5bd0c8622e5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/CommitDeniedException.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Exception thrown when a task attempts to commit output to Hadoop, but + * is denied by the driver. + */ +@DeveloperApi +class CommitDeniedException(msg: String, jobID: Int, splitID: Int, attemptID: Int) + extends Exception(msg) { + def toTaskEndReason(): TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptID) +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 92238c4524b9c..e9d791eccdb9c 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -122,10 +122,14 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } } } else { - logInfo(s"$taID: Not committed because DAGScheduler did not authorize commit") + val msg: String = s"$taID: Not committed because the driver did not authorize commit" + logInfo(msg) + throw new CommitDeniedException(msg, jobID, splitID, attemptID) } } else { - logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}") + val msg: String = s"No need to commit output of task because needsTaskCommit=false: ${taID.value}" + logInfo(msg) + throw new CommitDeniedException(msg, jobID, splitID, attemptID) } } diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index af5fd8e0ac00c..b9ed05c7e1b10 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -146,6 +146,20 @@ case object TaskKilled extends TaskFailedReason { override def toErrorString: String = "TaskKilled (killed intentionally)" } +/** + * :: DeveloperApi :: + * Task requested the driver to commit, but was denied. + */ +@DeveloperApi +case class TaskCommitDenied( + jobID: Int, + splitID: Int, + attemptID: Int) + extends TaskFailedReason { + override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" + + s" for job: $jobID, split: $splitID, attempt: $attemptID" +} + /** * :: DeveloperApi :: * The task failed because the executor that it was running on was lost. This may happen because diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 42566d1a14093..9c3f82e2b2471 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -248,6 +248,11 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) } + case cDE: CommitDeniedException => { + val reason = cDE.toTaskEndReason + execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) + } + case t: Throwable => { // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dc6457de63713..6754a3d1e2381 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1082,6 +1082,9 @@ class DAGScheduler( handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch)) } + case TaskCommitDenied(jobID, splitID, attemptID) => + // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits + case ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics) => // Do nothing here, left up to the TaskScheduler to decide how to handle user failures diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 7d4b7a3d51b87..6af0c3acc23a8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -17,16 +17,15 @@ package org.apache.spark.scheduler -import java.util.concurrent.{ExecutorService, TimeUnit, Executors, ConcurrentHashMap} +import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap} import scala.collection.{Map => ScalaImmutableMap} -import scala.collection.concurrent.{Map => ScalaConcurrentMap} import scala.collection.convert.decorateAsScala._ import akka.actor.{ActorRef, Actor} import org.apache.spark.{SparkConf, Logging} -import org.apache.spark.util.{AkkaUtils, ActorLogReceive} +import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive} private[spark] sealed trait OutputCommitCoordinationMessage @@ -119,7 +118,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { coordinatorActor = Some(actor) executorRequestHandlingThreadPool = { if (isDriver) { - Some(Executors.newFixedThreadPool(4)) + Some(Utils.newDaemonFixedThreadPool(8, "OutputCommitCoordinator")) } else { None } @@ -174,8 +173,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { task: TaskId, partId: PartitionId, attempt: TaskAttemptId): Unit = { - executorRequestHandlingThreadPool.foreach(_.submit( - new AskCommitRunnable(requester, this, stage, task, partId, attempt))) + executorRequestHandlingThreadPool match { + case Some(threadPool) => + threadPool.submit(new AskCommitRunnable(requester, this, stage, task, partId, attempt)) + case None => + logWarning("Got a request to commit output, but the OutputCommitCoordinator was already" + + " shut down. Request is being denied.") + requester ! false + } + } private def handleTaskCompletion( diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5c94c6bbcb37b..727921fcb4642 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -646,6 +646,9 @@ private[spark] class TaskSetManager( s"${ef.className} (${ef.description}) [duplicate $dupCount]") } + case e: TaskCommitDenied => + logWarning(failureReason) + case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others logWarning(failureReason) @@ -657,7 +660,7 @@ private[spark] class TaskSetManager( put(info.executorId, clock.getTime()) sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics) addPendingTask(index) - if (!isZombie && state != TaskState.KILLED) { + if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) { assert (null != failureReason) numFailures(index) += 1 if (numFailures(index) >= maxTaskFailures) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8df46f34d59c8..6d8389c5db03e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -20,12 +20,11 @@ package org.apache.spark.scheduler import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map} import scala.language.reflectiveCalls +import org.mockito.Mockito.mock import org.scalatest.{BeforeAndAfter, FunSuiteLike} import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ -import org.mockito.Mockito.mock - import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode From 9fe64953aa437ed1ed88a294e04129afc8f2bbb5 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 27 Jan 2015 11:18:49 -0800 Subject: [PATCH 16/19] Fixing scalastyle --- .../main/scala/org/apache/spark/CommitDeniedException.scala | 3 ++- core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/CommitDeniedException.scala index 3b5bd0c8622e5..3a910d4339265 100644 --- a/core/src/main/scala/org/apache/spark/CommitDeniedException.scala +++ b/core/src/main/scala/org/apache/spark/CommitDeniedException.scala @@ -28,4 +28,5 @@ import org.apache.spark.annotation.DeveloperApi class CommitDeniedException(msg: String, jobID: Int, splitID: Int, attemptID: Int) extends Exception(msg) { def toTaskEndReason(): TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptID) -} \ No newline at end of file +} + diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index e9d791eccdb9c..9ce2b10a6c4ae 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -127,7 +127,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf) throw new CommitDeniedException(msg, jobID, splitID, attemptID) } } else { - val msg: String = s"No need to commit output of task because needsTaskCommit=false: ${taID.value}" + val msg: String = s"No need to commit output of task" + + " because needsTaskCommit=false: ${taID.value}" logInfo(msg) throw new CommitDeniedException(msg, jobID, splitID, attemptID) } From d63f63f5769a29d9377b15f3025726477226ca88 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 27 Jan 2015 11:50:31 -0800 Subject: [PATCH 17/19] Fixing compiler error --- core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 9ce2b10a6c4ae..020b6c14d2399 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -127,8 +127,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf) throw new CommitDeniedException(msg, jobID, splitID, attemptID) } } else { - val msg: String = s"No need to commit output of task" - + " because needsTaskCommit=false: ${taID.value}" + val msg: String = s"No need to commit output of task" + + " because needsTaskCommit=false: ${taID.value}" logInfo(msg) throw new CommitDeniedException(msg, jobID, splitID, attemptID) } From 60a47f4cd71d1ada6750b35e1b9e16e37c1caab4 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 29 Jan 2015 15:33:01 -0800 Subject: [PATCH 18/19] Writing proper unit test for OutputCommitCoordinator and fixing bugs. --- .../scala/org/apache/spark/SparkEnv.scala | 3 +- .../org/apache/spark/SparkHadoopWriter.scala | 9 +- .../apache/spark/scheduler/DAGScheduler.scala | 9 +- .../scheduler/OutputCommitCoordinator.scala | 178 +++++------------- .../spark/scheduler/TaskSchedulerImpl.scala | 9 +- .../spark/scheduler/TaskSetManager.scala | 31 +-- .../OutputCommitCoordinatorSuite.scala | 166 +++++++++------- 7 files changed, 180 insertions(+), 225 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 646cf050f0e15..5526eea39f643 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -353,8 +353,7 @@ object SparkEnv extends Logging { val outputCommitCoordinator = new OutputCommitCoordinator(conf) val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator", new OutputCommitCoordinatorActor(outputCommitCoordinator)) - outputCommitCoordinator.initialize(outputCommitCoordinatorActor, isDriver) - + outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor) new SparkEnv( executorId, actorSystem, diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 020b6c14d2399..10db43624330f 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -108,8 +108,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator - val canCommit = outputCommitCoordinator.canCommit(jobID, - taID.value.getTaskID().getId(), splitID, attemptID) + val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) if (canCommit) { try { cmtr.commitTask(taCtxt) @@ -124,13 +123,11 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } else { val msg: String = s"$taID: Not committed because the driver did not authorize commit" logInfo(msg) + cmtr.abortTask(taCtxt) throw new CommitDeniedException(msg, jobID, splitID, attemptID) } } else { - val msg: String = s"No need to commit output of task" + - " because needsTaskCommit=false: ${taID.value}" - logInfo(msg) - throw new CommitDeniedException(msg, jobID, splitID, attemptID) + logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6754a3d1e2381..81c6b2beb2879 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -810,7 +810,7 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) - outputCommitCoordinator.stageStart(stage.id, partitionsToCompute) + outputCommitCoordinator.stageStart(stage.id) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. @@ -912,11 +912,9 @@ class DAGScheduler( val task = event.task val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) - val isSuccess = event.reason == Success - outputCommitCoordinator.taskCompleted(stageId, event.taskInfo.taskId, - task.partitionId, event.taskInfo.attempt, - isSuccess) + outputCommitCoordinator.taskCompleted(stageId, task.partitionId, + event.taskInfo.attempt, event.reason) // The success case is dealt with separately below, since we need to compute accumulator // updates before posting. @@ -930,6 +928,7 @@ class DAGScheduler( // Skip all the actions if the stage has been cancelled. return } + val stage = stageIdToStage(task.stageId) def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 6af0c3acc23a8..4020ece7b28ad 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -17,37 +17,31 @@ package org.apache.spark.scheduler -import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap} - -import scala.collection.{Map => ScalaImmutableMap} -import scala.collection.convert.decorateAsScala._ +import scala.collection.mutable import akka.actor.{ActorRef, Actor} -import org.apache.spark.{SparkConf, Logging} -import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive} +import org.apache.spark._ +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} -private[spark] sealed trait OutputCommitCoordinationMessage +private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable -private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) - extends OutputCommitCoordinationMessage +private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage private[spark] case class AskPermissionToCommitOutput( stage: Int, task: Long, - partId: Int, taskAttempt: Long) - extends OutputCommitCoordinationMessage with Serializable + extends OutputCommitCoordinationMessage private[spark] case class TaskCompleted( stage: Int, task: Long, - partId: Int, attempt: Long, - successful: Boolean) - extends OutputCommitCoordinationMessage + reason: TaskEndReason) + extends OutputCommitCoordinationMessage /** * Authority that decides whether tasks can commit output to HDFS. @@ -57,157 +51,100 @@ private[spark] case class TaskCompleted( */ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { - private type StageId = Int - private type PartitionId = Int - private type TaskId = Long - private type TaskAttemptId = Long - - // Wrapper for an int option that allows it to be locked via a synchronized block - // while still setting option itself to Some(...) or None. - private class LockableAttemptId(var value: Option[TaskAttemptId]) - - private type CommittersByStageHashMap = - ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, LockableAttemptId]] - // Initialized by SparkEnv - private var coordinatorActor: Option[ActorRef] = None + var coordinatorActor: Option[ActorRef] = None private val timeout = AkkaUtils.askTimeout(conf) private val maxAttempts = AkkaUtils.numRetries(conf) private val retryInterval = AkkaUtils.retryWaitMs(conf) - private val authorizedCommittersByStage = new CommittersByStageHashMap().asScala - private var executorRequestHandlingThreadPool: Option[ExecutorService] = None + private type StageId = Int + private type TaskId = Long + private type TaskAttemptId = Long + private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] - def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = { - sendToActor(StageStarted(stage, partitionIds)) - } + private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map() - def stageEnd(stage: StageId): Unit = { + def stageStart(stage: StageId) { + sendToActor(StageStarted(stage)) + } + def stageEnd(stage: StageId) { sendToActor(StageEnded(stage)) } def canCommit( stage: StageId, task: TaskId, - partId: PartitionId, attempt: TaskAttemptId): Boolean = { - askActor(AskPermissionToCommitOutput(stage, task, partId, attempt)) + askActor(AskPermissionToCommitOutput(stage, task, attempt)) } def taskCompleted( stage: StageId, task: TaskId, - partId: PartitionId, attempt: TaskAttemptId, - successful: Boolean): Unit = { - sendToActor(TaskCompleted(stage, task, partId, attempt, successful)) + reason: TaskEndReason) { + sendToActor(TaskCompleted(stage, task, attempt, reason)) } - def stop(): Unit = { - executorRequestHandlingThreadPool.foreach { pool => - pool.shutdownNow() - pool.awaitTermination(10, TimeUnit.SECONDS) - } + def stop() { sendToActor(StopCoordinator) coordinatorActor = None - executorRequestHandlingThreadPool = None + authorizedCommittersByStage.foreach(_._2.clear) authorizedCommittersByStage.clear } - def initialize(actor: ActorRef, isDriver: Boolean): Unit = { - coordinatorActor = Some(actor) - executorRequestHandlingThreadPool = { - if (isDriver) { - Some(Utils.newDaemonFixedThreadPool(8, "OutputCommitCoordinator")) - } else { - None - } - } - } - - // Methods that mutate the internal state of the coordinator shouldn't be - // called directly, and are thus made private instead of public. The - // private methods should be called from the Actor, and callers use the - // public methods to send messages to the actor. - private def handleStageStart(stage: StageId, partitionIds: Seq[Int]): Unit = { - val initialLockStates = partitionIds.map(partId => { - partId -> new LockableAttemptId(None) - }).toMap - authorizedCommittersByStage.put(stage, initialLockStates) + private def handleStageStart(stage: StageId): Unit = { + authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]() } private def handleStageEnd(stage: StageId): Unit = { authorizedCommittersByStage.remove(stage) } - private def determineIfCommitAllowed( + private def handleAskPermissionToCommit( stage: StageId, task: TaskId, - partId: PartitionId, - attempt: TaskAttemptId): Boolean = { + attempt: TaskAttemptId): + Boolean = { authorizedCommittersByStage.get(stage) match { case Some(authorizedCommitters) => - val authorizedCommitMetadataForPart = authorizedCommitters(partId) - authorizedCommitMetadataForPart.synchronized { - // Don't use match - we'll be setting the value of the option in the else block - if (authorizedCommitMetadataForPart.value.isDefined) { - val existingCommitter = authorizedCommitMetadataForPart.value.get + authorizedCommitters.get(stage) match { + case Some(existingCommitter) => logDebug(s"Denying $attempt to commit for stage=$stage, task=$task; " + s"existingCommitter = $existingCommitter") false - } else { + case None => logDebug(s"Authorizing $attempt to commit for stage=$stage, task=$task") - authorizedCommitMetadataForPart.value = Some(attempt) + authorizedCommitters(task) = attempt true - } } case None => logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit") - false + return false } } - private def handleAskPermissionToCommitOutput( - requester: ActorRef, - stage: StageId, - task: TaskId, - partId: PartitionId, - attempt: TaskAttemptId): Unit = { - executorRequestHandlingThreadPool match { - case Some(threadPool) => - threadPool.submit(new AskCommitRunnable(requester, this, stage, task, partId, attempt)) - case None => - logWarning("Got a request to commit output, but the OutputCommitCoordinator was already" + - " shut down. Request is being denied.") - requester ! false - } - - } - private def handleTaskCompletion( stage: StageId, task: TaskId, - partId: PartitionId, attempt: TaskAttemptId, - successful: Boolean): Unit = { + reason: TaskEndReason): Unit = { authorizedCommittersByStage.get(stage) match { case Some(authorizedCommitters) => - val authorizedCommitMetadataForPart = authorizedCommitters(partId) - authorizedCommitMetadataForPart.synchronized { - if (authorizedCommitMetadataForPart.value == Some(attempt) && !successful) { - logDebug(s"Authorized committer $attempt (stage=$stage," + - s" task=$task) failed; clearing lock") - // The authorized committer failed; clear the lock so future attempts can - // commit their output - authorizedCommitMetadataForPart.value = None - } + reason match { + case Success => return + case TaskCommitDenied(jobID, splitID, attemptID) => + logInfo(s"Task was denied committing, stage: $stage, taskId: $task, attempt: $attempt") + case otherReason => + logDebug(s"Authorized committer $attempt (stage=$stage, task=$task) failed; clearing lock") + authorizedCommitters.remove(task) } case None => logDebug(s"Ignoring task completion for completed stage") } } - private def sendToActor(msg: OutputCommitCoordinationMessage): Unit = { + private def sendToActor(msg: OutputCommitCoordinationMessage) { coordinatorActor.foreach(_ ! msg) } @@ -216,39 +153,22 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { .map(AkkaUtils.askWithReply[Boolean](msg, _, maxAttempts, retryInterval, timeout)) .getOrElse(false) } - - class AskCommitRunnable( - private val requester: ActorRef, - private val outputCommitCoordinator: OutputCommitCoordinator, - private val stage: StageId, - private val task: TaskId, - private val partId: PartitionId, - private val taskAttempt: TaskAttemptId) - extends Runnable { - override def run(): Unit = { - requester ! outputCommitCoordinator.determineIfCommitAllowed(stage, task, partId, taskAttempt) - } - } } private[spark] object OutputCommitCoordinator { - // Actor is defined inside the OutputCommitCoordinator object so that receiveWithLogging() - // can call the private methods, where it is safe to do so because it is in the actor event - // loop. class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator) extends Actor with ActorLogReceive with Logging { - override def receiveWithLogging() = { - case StageStarted(stage, partitionIds) => - outputCommitCoordinator.handleStageStart(stage, partitionIds) + override def receiveWithLogging = { + case StageStarted(stage) => + outputCommitCoordinator.handleStageStart(stage) case StageEnded(stage) => outputCommitCoordinator.handleStageEnd(stage) - case AskPermissionToCommitOutput(stage, task, partId, taskAttempt) => - outputCommitCoordinator.handleAskPermissionToCommitOutput( - sender, stage, task, partId, taskAttempt) - case TaskCompleted(stage, task, partId, attempt, successful) => - outputCommitCoordinator.handleTaskCompletion(stage, task, partId, attempt, successful) + case AskPermissionToCommitOutput(stage, task, taskAttempt) => + sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, task, taskAttempt) + case TaskCompleted(stage, task, attempt, reason) => + outputCommitCoordinator.handleTaskCompletion(stage, task, attempt, reason) case StopCoordinator => logInfo("OutputCommitCoordinator stopped!") context.stop(self) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index a1dfb01062591..d49792d774c9b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -158,7 +158,7 @@ private[spark] class TaskSchedulerImpl( val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { - val manager = new TaskSetManager(this, taskSet, maxTaskFailures) + val manager = createTaskSetManager(taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) @@ -180,6 +180,13 @@ private[spark] class TaskSchedulerImpl( backend.reviveOffers() } + // Label as private[scheduler] to allow tests to swap in different task set managers if necessary + private[scheduler] def createTaskSetManager( + taskSet: TaskSet, + maxTaskFailures: Int): TaskSetManager = { + new TaskSetManager(this, taskSet, maxTaskFailures) + } + override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { logInfo("Cancelling stage " + stageId) activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 727921fcb4642..afd40cb4bf1e3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -292,7 +292,8 @@ private[spark] class TaskSetManager( * an attempt running on this host, in case the host is slow. In addition, the task should meet * the given locality constraint. */ - private def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) + // Labeled as protected to allow tests to override providing speculative tasks if necessary + protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set @@ -596,7 +597,9 @@ private[spark] class TaskSetManager( removeRunningTask(tid) info.markFailed() val index = info.index - copiesRunning(index) -= 1 + if (copiesRunning(index) >= 1) { + copiesRunning(index) -= 1 + } var taskMetrics : TaskMetrics = null val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " + @@ -651,6 +654,7 @@ private[spark] class TaskSetManager( case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others logWarning(failureReason) + return case e: TaskEndReason => logError("Unknown TaskEndReason: " + e) @@ -659,17 +663,20 @@ private[spark] class TaskSetManager( failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). put(info.executorId, clock.getTime()) sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics) - addPendingTask(index) - if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) { - assert (null != failureReason) - numFailures(index) += 1 - if (numFailures(index) >= maxTaskFailures) { - logError("Task %d in stage %s failed %d times; aborting job".format( - index, taskSet.id, maxTaskFailures)) - abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:" - .format(index, taskSet.id, maxTaskFailures, failureReason)) - return + if (!reason.isInstanceOf[TaskCommitDenied]) { + addPendingTask(index) + if (!isZombie && state != TaskState.KILLED) { + assert (null != failureReason) + numFailures(index) += 1 + if (numFailures(index) >= maxTaskFailures) { + logError("Task %d in stage %s failed %d times; aborting job".format( + index, taskSet.id, maxTaskFailures)) + abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:" + .format(index, taskSet.id, maxTaskFailures, failureReason)) + return + } } + } maybeFinishTaskSet() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 19371c69fce1d..eed7f09919783 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -17,96 +17,121 @@ package org.apache.spark.scheduler -import java.io.{ObjectInputStream, ObjectOutputStream, IOException} - -import scala.collection.mutable +import java.io.{File, ObjectInputStream, ObjectOutputStream, IOException} +import org.mockito.Matchers import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.scalatest.concurrent.Timeouts import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, TaskAttemptContext, OutputCommitter} import org.apache.spark._ -import org.apache.spark.executor.{TaskMetrics} import org.apache.spark.rdd.FakeOutputCommitter +import org.apache.spark.util.Utils + +import scala.collection.mutable.ArrayBuffer /** - * Unit tests for the output commit coordination functionality. Overrides the - * SchedulerImpl to just run the tasks directly and send completion or error - * messages back to the DAG scheduler. + * Unit tests for the output commit coordination functionality. + * + * The unit test makes both the original task and the speculated task + * attempt to commit, where committing is emulated by creating a + * directory. If both tasks create directories then the end result is + * a failure. + * + * Note that there are some aspects of this test that are less than ideal. + * In particular, the test mocks the speculation-dequeuing logic to always + * dequeue a task and consider it as speculated. Immediately after initially + * submitting the tasks and calling reviveOffers(), reviveOffers() is invoked + * again to pick up the speculated task. This may be hacking the original + * behavior in too much of an unrealistic fashion. + * + * Also, the validation is done by checking the number of files in a directory. + * Ideally, an accumulator would be used for this, where we could increment + * the accumulator in the output committer's commitTask() call. If the call to + * commitTask() was called twice erroneously then the test would ideally fail because + * the accumulator would be incremented twice. + * + * The problem with this test implementation is that when both a speculated task and + * its original counterpart complete, only one of the accumulator's increments is + * captured. This results in a paradox where if the OutputCommitCoordinator logic + * was not in SparkHadoopWriter, the tests would still pass because only one of the + * increments would be captured even though the commit in both tasks was executed + * erroneously. */ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter - with LocalSparkContext with Timeouts { - val conf = new SparkConf().set("spark.localExecution.enabled", "true") + val conf = new SparkConf() + .set("spark.localExecution.enabled", "true") - var taskScheduler: TaskSchedulerImpl = null var dagScheduler: DAGScheduler = null - var dagSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop = null - var accum: Accumulator[Int] = null - var accumId: Long = 0 + var tempDir: File = null + var tempDirPath: String = null + var sc: SparkContext = null before { - sc = new SparkContext("local", "Output Commit Coordinator Suite") - accum = sc.accumulator[Int](0) - Accumulators.register(accum, true) - accumId = accum.id - - taskScheduler = new TaskSchedulerImpl(sc, 4, true) { - override def submitTasks(taskSet: TaskSet) { - // Instead of submitting a task to some executor, just run the task directly. - // Make two attempts. The first may or may not succeed. If the first - // succeeds then the second is redundant and should be handled - // accordingly by OutputCommitCoordinator. Otherwise the second - // should not be blocked from succeeding. - execTasks(taskSet, 0) - execTasks(taskSet, 1) + sc = new SparkContext("local[4]", "Output Commit Coordinator Suite") + tempDir = Utils.createTempDir() + tempDirPath = tempDir.getAbsolutePath() + // Use Mockito.spy() to maintain the default infrastructure everywhere else + val mockTaskScheduler = spy(sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]) + + doAnswer(new Answer[Unit]() { + override def answer(invoke: InvocationOnMock): Unit = { + // Submit the tasks, then, force the task scheduler to dequeue the + // speculated task + invoke.callRealMethod() + mockTaskScheduler.backend.reviveOffers() } - - private def execTasks(taskSet: TaskSet, attemptNumber: Int) { - var taskIndex = 0 - taskSet.tasks.foreach { t => - val tid = newTaskId - val taskInfo = new TaskInfo(tid, taskIndex, 0, System.currentTimeMillis, "0", - "localhost", TaskLocality.NODE_LOCAL, false) - taskIndex += 1 - // Track the successful commits in an accumulator. However, we can't just invoke - // accum += 1 since this unit test circumvents the usual accumulator updating - // infrastructure. So just send the accumulator update manually. - val accumUpdates = new mutable.HashMap[Long, Any] - try { - accumUpdates(accumId) = t.run(attemptNumber, attemptNumber) - dagSchedulerEventProcessLoop.post( - new CompletionEvent(t, Success, 0, accumUpdates, taskInfo, new TaskMetrics)) - } catch { - case e: Throwable => - dagSchedulerEventProcessLoop.post(new CompletionEvent(t, new ExceptionFailure(e, - Option.empty[TaskMetrics]), 1, accumUpdates, taskInfo, new TaskMetrics)) + }).when(mockTaskScheduler).submitTasks(Matchers.any()) + + doAnswer(new Answer[TaskSetManager]() { + override def answer(invoke: InvocationOnMock): TaskSetManager = { + val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet] + return new TaskSetManager(mockTaskScheduler, taskSet, 4) { + var hasDequeuedSpeculatedTask = false + override def dequeueSpeculativeTask( + execId: String, + host: String, + locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = { + if (!hasDequeuedSpeculatedTask) { + hasDequeuedSpeculatedTask = true + return Some(0, TaskLocality.PROCESS_LOCAL) + } else { + return None + } } } } - } + }).when(mockTaskScheduler).createTaskSetManager(Matchers.any(), Matchers.any()) - dagScheduler = new DAGScheduler(sc, taskScheduler) - taskScheduler.setDAGScheduler(dagScheduler) - sc.dagScheduler = dagScheduler - dagSchedulerEventProcessLoop = new DAGSchedulerSingleThreadedProcessLoop(dagScheduler) + sc.taskScheduler = mockTaskScheduler + val dagSchedulerWithMockTaskScheduler = new DAGScheduler(sc, mockTaskScheduler) + sc.taskScheduler.setDAGScheduler(dagSchedulerWithMockTaskScheduler) + sc.dagScheduler = dagSchedulerWithMockTaskScheduler + } + + after { + sc.stop() + tempDir.delete() } /** * Function that constructs a SparkHadoopWriter with a mock committer and runs its commit */ - private class OutputCommittingFunction + private class OutputCommittingFunction(private var tempDirPath: String) extends ((TaskContext, Iterator[Int]) => Int) with Serializable { def apply(ctxt: TaskContext, it: Iterator[Int]): Int = { val outputCommitter = new FakeOutputCommitter { - override def commitTask(taskAttemptContext: TaskAttemptContext) { - super.commitTask(taskAttemptContext) + override def commitTask(context: TaskAttemptContext) : Unit = { + Utils.createDirectory(tempDirPath) } } runCommitWithProvidedCommitter(ctxt, it, outputCommitter) @@ -128,27 +153,26 @@ class OutputCommitCoordinatorSuite } sparkHadoopWriter.setup(ctxt.stageId, ctxt.partitionId, ctxt.attemptNumber) sparkHadoopWriter.commit - if (FakeOutputCommitter.ran) { - FakeOutputCommitter.ran = false - 1 - } else { - 0 - } + 0 } // Need this otherwise the entire test suite attempts to be serialized @throws(classOf[IOException]) - private def writeObject(out: ObjectOutputStream) {} + private def writeObject(out: ObjectOutputStream): Unit = { + out.writeUTF(tempDirPath) + } @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream) {} + private def readObject(in: ObjectInputStream): Unit = { + tempDirPath = in.readUTF() + } } /** * Function that will explicitly fail to commit on the first attempt */ - private class FailFirstTimeCommittingFunction - extends OutputCommittingFunction { + private class FailFirstTimeCommittingFunction(private var tempDirPath: String) + extends OutputCommittingFunction(tempDirPath) { override def apply(ctxt: TaskContext, it: Iterator[Int]): Int = { if (ctxt.attemptNumber == 0) { val outputCommitter = new FakeOutputCommitter { @@ -164,14 +188,16 @@ class OutputCommitCoordinatorSuite } test("Only one of two duplicate commit tasks should commit") { - val rdd = sc.parallelize(1 to 10, 10) - sc.runJob(rdd, new OutputCommittingFunction) - assert(accum.value === 10) + val rdd = sc.parallelize(Seq(1), 1) + sc.runJob(rdd, new OutputCommittingFunction(tempDirPath), + 0 until rdd.partitions.size, allowLocal = true) + assert(tempDir.list().size === 1) } test("If commit fails, if task is retried it should not be locked, and will succeed.") { val rdd = sc.parallelize(Seq(1), 1) - sc.runJob(rdd, new FailFirstTimeCommittingFunction) - assert(accum.value === 1) + sc.runJob(rdd, new FailFirstTimeCommittingFunction(tempDirPath), + 0 until rdd.partitions.size, allowLocal = true) + assert(tempDir.list().size === 1) } } From 594e41abecf5a48084608ab20112f884f28fc920 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 29 Jan 2015 15:49:40 -0800 Subject: [PATCH 19/19] Fixing a scalastyle error --- .../org/apache/spark/scheduler/OutputCommitCoordinator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 4020ece7b28ad..5f6f6f3422b2a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -136,7 +136,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { case TaskCommitDenied(jobID, splitID, attemptID) => logInfo(s"Task was denied committing, stage: $stage, taskId: $task, attempt: $attempt") case otherReason => - logDebug(s"Authorized committer $attempt (stage=$stage, task=$task) failed; clearing lock") + logDebug(s"Authorized committer $attempt (stage=$stage, task=$task) failed;" + + s" clearing lock") authorizedCommitters.remove(task) } case None =>