Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4879] Use the Spark driver to authorize Hadoop commits. #4155

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions core/src/main/scala/org/apache/spark/CommitDeniedException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Exception thrown when a task attempts to commit output to Hadoop, but
* is denied by the driver.
*/
@DeveloperApi
class CommitDeniedException(msg: String, jobID: Int, splitID: Int, attemptID: Int)
extends Exception(msg) {
def toTaskEndReason(): TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptID)
}

11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark
import java.io.File
import java.net.Socket

import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorActor

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Properties
Expand All @@ -34,7 +36,7 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
Expand Down Expand Up @@ -67,6 +69,7 @@ class SparkEnv (
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val shuffleMemoryManager: ShuffleMemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {

private[spark] var isStopped = false
Expand All @@ -86,6 +89,7 @@ class SparkEnv (
blockManager.stop()
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
actorSystem.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
Expand Down Expand Up @@ -346,6 +350,10 @@ object SparkEnv extends Logging {
"levels using the RDD.persist() method instead.")
}

val outputCommitCoordinator = new OutputCommitCoordinator(conf)
val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator",
new OutputCommitCoordinatorActor(outputCommitCoordinator))
outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor)
new SparkEnv(
executorId,
actorSystem,
Expand All @@ -362,6 +370,7 @@ object SparkEnv extends Logging {
sparkFilesDir,
metricsSystem,
shuffleMemoryManager,
outputCommitCoordinator,
conf)
}

Expand Down
28 changes: 19 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.util.AkkaUtils

/**
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
Expand Down Expand Up @@ -106,18 +107,27 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
} catch {
case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID)
if (canCommit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I wonder if this can be a problem. Given the following timeline:

Time ->

(1)--------(2)--------(3)

(4)--------------(5)

1: task 1 start
2. task 1 asks for permission to commit, it's granted
3. task 1 fails to commit
4. task 2 starts (doing same work as task 1)
5. task 2 asks for permission to commit, it's denied

Wouldn't this code force a new task to be run to recompute everything? Also, wouldn't task 2 actually report itself as successful, and break things, since there is a successful task for that particular split, but it was never committed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would force a new task to recompute everything, but this does highlight that task 2 should throw an error, @JoshRosen? Should be okay even if the first task finished successfully - TaskSchedulerImpl explicitly ignores status updates from duplicate tasks, removing the task from its container on the successful case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But another follow-up question - how did we get away with this case before? If committer.needsTaskCommit returns false in SparkHadoopWriter it also does not throw an error. But if some other task that had committer.needsTaskCommit returned true but failed to commit the output, are we in the same situation as what @vanzin described albeit in a slightly different code path?

Or to be more explicit, take the 5 steps but slightly adjust them and assume the code I added in this patch doesn't exist:

1: Task 1 start
2. Task 1 asks committer.needsTaskCommit() which returns true
3. task 1 fails to commit
4. Task 2 starts
5. Task 2 asks committer.needsTaskCommit but that returns false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original committer.needsTaskCommit is a Hadoop API method, so it follows the Hadoop semantics. The API docs for the method are not very helpful, so hopefully somebody knows what they are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen, @vanzin's concern still holds. I'm going to change this so it throws an exception on the commit denial.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed this discussion since it got collapsed under a changed diff.

I'll have to revisit the docs / usage to confirm this, but I think that the Hadoop needsTaskCommit was a necessary but not sufficient condition for committing where needsTaskCommit should always return true for a task that has not had its output committed (in fact, I think it would be valid for it to always return true, even for already-committed output).

I agree that task 2 should throw an error, since it seems like doing otherwise would also lead to missing output: as soon as the scheduler sees one successful completion for a task, it won't re-run that task, so we need it to be the case that "successful task completion" implies "output committed."

To run with your example, I think that task 2 would see needsTaskCommit = false because the output hasn't been committed yet, but there's still a problem that can lead to missing output. Imagine that we had this interleaving:

1: Tasks 1 and 2 start
2. Task 1 asks committer.needsTaskCommit() which returns true
3. Task 2 asks committer.needsTaskCommit(), which returns false. It exits without throwing an exception and reports success.
4. Task 1 fails to commit

In this case, one copy of the task has reported success even though output was not committed, so that partition will be missing because we won't re-schedule an attempt to commit.

So, I agree with @vanzin: if the DAGScheduler did not authorize the commit, then we should throw an exception. I think that this exception will hopefully be rare in practice because needsTaskCommit should ideally return false for tasks that are definitely committed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Man, github makes it really hard to find this discussion. :-/

Anyway, just one comment:

if the DAGScheduler did not authorize the commit, then we should throw an exception

As long as that doesn't cause the driver's "task error count" go up, then fine. Otherwise, we probably need some new state (e.g. "task gave up") that doesn't count as an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, we probably need some new state (e.g. "task gave up") that doesn't count as an error.

We have the TaskEndReason mechanism for special-handling of certain kinds of task failures, so maybe we can add a new reason and update the corresponding handlers in TaskSetManager and DAGScheduler.

We might still end up having to throw an exception, but we can throw a more specific one that we can catch and transform into the right TaskEndReason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, there's code in Executor.scala to catch certain exceptions and translate them into TaskEndReasons: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L243

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

I'm trying to implement this change now but am getting tripped up in unit tests again. Now, the unit tests need to capture the logic all the way down to the Executor. It looks like this workflow requires something closer to an end-to-end test now. Are there any unit tests that test scenarios with speculation? How should my existing OutputCommitCoordinator suite be adjusted?

Basically, we need to "force" speculation to happen, emulating that a task set is resubmitted.

try {
cmtr.commitTask(taCtxt)
logInfo (s"$taID: Committed")
} catch {
case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
}
}
} else {
val msg: String = s"$taID: Not committed because the driver did not authorize commit"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be calling abortTask in this case as well.

logInfo(msg)
cmtr.abortTask(taCtxt)
throw new CommitDeniedException(msg, jobID, splitID, attemptID)
}
} else {
logInfo ("No need to commit output of task: " + taID.value)
logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}")
}
}

Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,20 @@ case object TaskKilled extends TaskFailedReason {
override def toErrorString: String = "TaskKilled (killed intentionally)"
}

/**
* :: DeveloperApi ::
* Task requested the driver to commit, but was denied.
*/
@DeveloperApi
case class TaskCommitDenied(
jobID: Int,
splitID: Int,
attemptID: Int)
extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, split: $splitID, attempt: $attemptID"
}

/**
* :: DeveloperApi ::
* The task failed because the executor that it was running on was lost. This may happen because
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
}

case cDE: CommitDeniedException => {
val reason = cDE.toTaskEndReason
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}

case t: Throwable => {
// Attempt to exit cleanly by informing the driver of our failure.
// If anything goes wrong (or this was a fatal exception), we will delegate to
Expand Down
15 changes: 13 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util.{CallSite, EventLoop, SystemClock, Clock, Utils}
import org.apache.spark.util._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
Expand All @@ -63,7 +63,7 @@ class DAGScheduler(
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = SystemClock)
clock: org.apache.spark.util.Clock = SystemClock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there another conflicting Clock imported?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark has a few different Clock classes, so maybe this was to disambiguate: https://issues.apache.org/jira/browse/SPARK-4682

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this was weird, it wouldn't compile without me quantifying the class here. Although that could be IntelliJ organizing my imports in a non-ideal way.

extends Logging {

def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
Expand Down Expand Up @@ -126,6 +126,8 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

private val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator

// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
Expand Down Expand Up @@ -808,6 +810,7 @@ class DAGScheduler(
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
outputCommitCoordinator.stageStart(stage.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it wouldn't be better to use a SparkListener to reduce coupling. Although that would potentially introduce race conditions in the code (since LiveListenerBus fires events on a separate thread).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i agree with @vanzin 's Opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this introduces a race between commit requests and the stage start event. If the listener bus is slow in delivering events, then it's possible that the output commit coordinator could receive a commit request via Akka for a stage that it doesn't know about yet.

listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
Expand Down Expand Up @@ -865,6 +868,7 @@ class DAGScheduler(
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should post
// SparkListenerStageCompleted here in case there are no tasks to run.
outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
Expand Down Expand Up @@ -909,6 +913,9 @@ class DAGScheduler(
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)

outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
event.taskInfo.attempt, event.reason)

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
Expand All @@ -921,6 +928,7 @@ class DAGScheduler(
// Skip all the actions if the stage has been cancelled.
return
}

val stage = stageIdToStage(task.stageId)

def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
Expand Down Expand Up @@ -1073,6 +1081,9 @@ class DAGScheduler(
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
}

case TaskCommitDenied(jobID, splitID, attemptID) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle denied commits

case ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures

Expand Down
Loading