-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4879] Use the Spark driver to authorize Hadoop commits. #4155
Changes from all commits
6e6f748
bc80770
c9decc6
6b543ba
66a71cd
1c2b219
f135a8e
abc7db4
83de900
78eb1b5
9c6a4fa
8d5a091
c334255
d431144
1df2a91
9fe6495
d63f63f
60a47f4
594e41a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark | ||
|
||
import org.apache.spark.annotation.DeveloperApi | ||
|
||
/** | ||
* :: DeveloperApi :: | ||
* Exception thrown when a task attempts to commit output to Hadoop, but | ||
* is denied by the driver. | ||
*/ | ||
@DeveloperApi | ||
class CommitDeniedException(msg: String, jobID: Int, splitID: Int, attemptID: Int) | ||
extends Exception(msg) { | ||
def toTaskEndReason(): TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptID) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path | |
|
||
import org.apache.spark.mapred.SparkHadoopMapRedUtil | ||
import org.apache.spark.rdd.HadoopRDD | ||
import org.apache.spark.util.AkkaUtils | ||
|
||
/** | ||
* Internal helper class that saves an RDD using a Hadoop OutputFormat. | ||
|
@@ -106,18 +107,27 @@ class SparkHadoopWriter(@transient jobConf: JobConf) | |
val taCtxt = getTaskContext() | ||
val cmtr = getOutputCommitter() | ||
if (cmtr.needsTaskCommit(taCtxt)) { | ||
try { | ||
cmtr.commitTask(taCtxt) | ||
logInfo (taID + ": Committed") | ||
} catch { | ||
case e: IOException => { | ||
logError("Error committing the output of task: " + taID.value, e) | ||
cmtr.abortTask(taCtxt) | ||
throw e | ||
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator | ||
val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) | ||
if (canCommit) { | ||
try { | ||
cmtr.commitTask(taCtxt) | ||
logInfo (s"$taID: Committed") | ||
} catch { | ||
case e: IOException => { | ||
logError("Error committing the output of task: " + taID.value, e) | ||
cmtr.abortTask(taCtxt) | ||
throw e | ||
} | ||
} | ||
} else { | ||
val msg: String = s"$taID: Not committed because the driver did not authorize commit" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be calling abortTask in this case as well. |
||
logInfo(msg) | ||
cmtr.abortTask(taCtxt) | ||
throw new CommitDeniedException(msg, jobID, splitID, attemptID) | ||
} | ||
} else { | ||
logInfo ("No need to commit output of task: " + taID.value) | ||
logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}") | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics | |
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.storage._ | ||
import org.apache.spark.util.{CallSite, EventLoop, SystemClock, Clock, Utils} | ||
import org.apache.spark.util._ | ||
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat | ||
|
||
/** | ||
|
@@ -63,7 +63,7 @@ class DAGScheduler( | |
mapOutputTracker: MapOutputTrackerMaster, | ||
blockManagerMaster: BlockManagerMaster, | ||
env: SparkEnv, | ||
clock: Clock = SystemClock) | ||
clock: org.apache.spark.util.Clock = SystemClock) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there another conflicting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spark has a few different There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this was weird, it wouldn't compile without me quantifying the class here. Although that could be IntelliJ organizing my imports in a non-ideal way. |
||
extends Logging { | ||
|
||
def this(sc: SparkContext, taskScheduler: TaskScheduler) = { | ||
|
@@ -126,6 +126,8 @@ class DAGScheduler( | |
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) | ||
taskScheduler.setDAGScheduler(this) | ||
|
||
private val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator | ||
|
||
// Called by TaskScheduler to report task's starting. | ||
def taskStarted(task: Task[_], taskInfo: TaskInfo) { | ||
eventProcessLoop.post(BeginEvent(task, taskInfo)) | ||
|
@@ -808,6 +810,7 @@ class DAGScheduler( | |
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted | ||
// event. | ||
stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) | ||
outputCommitCoordinator.stageStart(stage.id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it wouldn't be better to use a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, i agree with @vanzin 's Opinion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that this introduces a race between commit requests and the stage start event. If the listener bus is slow in delivering events, then it's possible that the output commit coordinator could receive a commit request via Akka for a stage that it doesn't know about yet. |
||
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) | ||
|
||
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. | ||
|
@@ -865,6 +868,7 @@ class DAGScheduler( | |
} else { | ||
// Because we posted SparkListenerStageSubmitted earlier, we should post | ||
// SparkListenerStageCompleted here in case there are no tasks to run. | ||
outputCommitCoordinator.stageEnd(stage.id) | ||
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) | ||
logDebug("Stage " + stage + " is actually done; %b %d %d".format( | ||
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) | ||
|
@@ -909,6 +913,9 @@ class DAGScheduler( | |
val stageId = task.stageId | ||
val taskType = Utils.getFormattedClassName(task) | ||
|
||
outputCommitCoordinator.taskCompleted(stageId, task.partitionId, | ||
event.taskInfo.attempt, event.reason) | ||
|
||
// The success case is dealt with separately below, since we need to compute accumulator | ||
// updates before posting. | ||
if (event.reason != Success) { | ||
|
@@ -921,6 +928,7 @@ class DAGScheduler( | |
// Skip all the actions if the stage has been cancelled. | ||
return | ||
} | ||
|
||
val stage = stageIdToStage(task.stageId) | ||
|
||
def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = { | ||
|
@@ -1073,6 +1081,9 @@ class DAGScheduler( | |
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch)) | ||
} | ||
|
||
case TaskCommitDenied(jobID, splitID, attemptID) => | ||
// Do nothing here, left up to the TaskScheduler to decide how to handle denied commits | ||
|
||
case ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics) => | ||
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I wonder if this can be a problem. Given the following timeline:
1: task 1 start
2. task 1 asks for permission to commit, it's granted
3. task 1 fails to commit
4. task 2 starts (doing same work as task 1)
5. task 2 asks for permission to commit, it's denied
Wouldn't this code force a new task to be run to recompute everything? Also, wouldn't task 2 actually report itself as successful, and break things, since there is a successful task for that particular split, but it was never committed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would force a new task to recompute everything, but this does highlight that task 2 should throw an error, @JoshRosen? Should be okay even if the first task finished successfully - TaskSchedulerImpl explicitly ignores status updates from duplicate tasks, removing the task from its container on the successful case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But another follow-up question - how did we get away with this case before? If committer.needsTaskCommit returns false in SparkHadoopWriter it also does not throw an error. But if some other task that had committer.needsTaskCommit returned true but failed to commit the output, are we in the same situation as what @vanzin described albeit in a slightly different code path?
Or to be more explicit, take the 5 steps but slightly adjust them and assume the code I added in this patch doesn't exist:
1: Task 1 start
2. Task 1 asks committer.needsTaskCommit() which returns true
3. task 1 fails to commit
4. Task 2 starts
5. Task 2 asks committer.needsTaskCommit but that returns false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original
committer.needsTaskCommit
is a Hadoop API method, so it follows the Hadoop semantics. The API docs for the method are not very helpful, so hopefully somebody knows what they are.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen, @vanzin's concern still holds. I'm going to change this so it throws an exception on the commit denial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I missed this discussion since it got collapsed under a changed diff.
I'll have to revisit the docs / usage to confirm this, but I think that the Hadoop
needsTaskCommit
was a necessary but not sufficient condition for committing whereneedsTaskCommit
should always returntrue
for a task that has not had its output committed (in fact, I think it would be valid for it to always returntrue
, even for already-committed output).I agree that task 2 should throw an error, since it seems like doing otherwise would also lead to missing output: as soon as the scheduler sees one successful completion for a task, it won't re-run that task, so we need it to be the case that "successful task completion" implies "output committed."
To run with your example, I think that task 2 would see
needsTaskCommit = false
because the output hasn't been committed yet, but there's still a problem that can lead to missing output. Imagine that we had this interleaving:1: Tasks 1 and 2 start
2. Task 1 asks committer.needsTaskCommit() which returns true
3. Task 2 asks committer.needsTaskCommit(), which returns false. It exits without throwing an exception and reports success.
4. Task 1 fails to commit
In this case, one copy of the task has reported success even though output was not committed, so that partition will be missing because we won't re-schedule an attempt to commit.
So, I agree with @vanzin: if the DAGScheduler did not authorize the commit, then we should throw an exception. I think that this exception will hopefully be rare in practice because needsTaskCommit should ideally return
false
for tasks that are definitely committed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Man, github makes it really hard to find this discussion. :-/
Anyway, just one comment:
As long as that doesn't cause the driver's "task error count" go up, then fine. Otherwise, we probably need some new state (e.g. "task gave up") that doesn't count as an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have the
TaskEndReason
mechanism for special-handling of certain kinds of task failures, so maybe we can add a new reason and update the corresponding handlers in TaskSetManager and DAGScheduler.We might still end up having to throw an exception, but we can throw a more specific one that we can catch and transform into the right TaskEndReason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, there's code in Executor.scala to catch certain exceptions and translate them into TaskEndReasons: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L243
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
I'm trying to implement this change now but am getting tripped up in unit tests again. Now, the unit tests need to capture the logic all the way down to the Executor. It looks like this workflow requires something closer to an end-to-end test now. Are there any unit tests that test scenarios with speculation? How should my existing OutputCommitCoordinator suite be adjusted?
Basically, we need to "force" speculation to happen, emulating that a task set is resubmitted.