-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4879] Use driver to coordinate Hadoop output committing for speculative tasks #4066
[SPARK-4879] Use driver to coordinate Hadoop output committing for speculative tasks #4066
Conversation
Test build #25629 timed out for PR 4066 at commit |
} else { | ||
logInfo (s"$taID: Not committed because DAGScheduler did not authorize commit") | ||
} | ||
|
||
} catch { | ||
case e: IOException => { | ||
logError("Error committing the output of task: " + taID.value, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we need to catch TimeoutException here, too.
Test build #25640 has finished for PR 4066 at commit
|
cmtr.commitTask(taCtxt) | ||
logInfo (taID + ": Committed") | ||
val canCommit: Boolean = AkkaUtils.askWithReply( | ||
AskPermissionToCommitOutput(jobID, splitID, attemptID), dagSchedulerActor, askTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do this for every task, not just speculated ones, will this bottleneck at the driver? Can we isolate the ask-for-commit logic to only tasks that have a copy via speculation? Not sure how I would architect that though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't know in advance which tasks might have speculated copies, so I think you could only enable/disable this depending on whether speculation was enabled rather than on a per-task or job basis. Note that this only affects ResultTasks that save output to Hadoop, such as saveAsTextFile
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could send a message from the driver node when it determines a task should have a speculated copy, to the executor that was running the original copy of the task. The executors running the two versions of the task would each know that their commits are unsafe. It would be tricky to avoid a second race in this solution however.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether or not such an extra layer of complexity and coordination is necessary... it would be good before we merge this patch (or some version of it) to run a benchmarking suite against it. Even though this is only affecting hadoop-writing result tasks, it's on a critical path and we need a good understanding of the performance implications. Particularly when one saves large RDDs.
A large fraction of the test failures in DAGSchedulerSuite are because it passes |
Test build #25679 timed out for PR 4066 at commit |
Instead of having every task require a call back to the driver or master, can the master broadcast to the executor that a task is being speculated and any executor with a copy of that task should do a commit verification? Just throwing the idea out there. |
@mccheah I agree that we could add optimizations to skip this coordination in scenarios where we know that there will only be one copy of a task, but it might be a good idea to defer those optimizations to a separate patch if it turns out that the performance impact is small. Once we've got a version of this patch that passes tests, let's benchmark it and see how costly this extra coordination is. |
Jenkins, retest this please. |
(Just so we can see the test failure message again) |
Did you think of any corner cases that you might have missed? In terms of correctness, this seems okay (although the Jenkins build indicates there are some issues). Have you thought about testing? I'm trying to brainstorm how to unit test this as well. |
Hmm, let me think about this a bit... I don't think that we have any end-to-end tests with task speculation right now, since Spark won't schedule speculated tasks on executors that are running on the same host. However, I don't think that end-to-end tests should be the only/best way to test this, since there are a few special cases that require us to have finer-grained control over the event interleaving. Here are some of the scenarios that I'd like to test:
Since this involves an interplay between DAGScheduler logic, OutputCommitCoordinator, and actual tasks, we'll might have to use a mock TaskSchedulerBackend and mock task to test this. |
Test build #25834 has finished for PR 4066 at commit
|
@@ -105,10 +107,24 @@ class SparkHadoopWriter(@transient jobConf: JobConf) | |||
def commit() { | |||
val taCtxt = getTaskContext() | |||
val cmtr = getOutputCommitter() | |||
val conf = SparkEnv.get.conf | |||
val dagSchedulerActor = | |||
AkkaUtils.makeDriverRef("DAGSchedulerSupervisor/DAGScheduler", conf, SparkEnv.get.actorSystem) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of the tests that broke, did so because of "Name is not unique" errors. I don't know Akka nearly as well as I should, but why was this name added explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here was modeled after how Executor -> Driver communication works for the HeartbeatReceiver actor.
On the driver, HeartbeatReceiver is started with an explicit name:
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver") |
The executors use this name to construct a reference to the actor:
val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem) |
Looking at the Akka docs:
The name parameter is optional, but you should preferably name your actors, since that is used in log messages and for identifying actors. The name must not be empty or start with $, but it may contain URL encoded characters (eg. %20 for a blank space). If the given name is already in use by another child to the same parent an InvalidActorNameException is thrown.
If you don't pass an explicit name, the actor is assigned an automatically-generated name (reference).
So, my hunch is that we're somehow creating multiple actors with the same name. I wonder if this is due to sharing between tests or improper cleanup, or whether there's something wrong with my use of explicit actor names here.
Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is turned on, sometimes this can result in multiple tasks committing their output to the same partition. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well. This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see the aforementioned JIRA ticket. In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing. This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state.
The linked pull request takes your ideas, makes them compatible with master, and adds unit tests. Feel free to take a look. |
The SparkHadoopWriter wasn't serializable because the commit coordinator was a class member. It doesn't need to be, so just get it on demand when committing the task.
But we don't care about what's actually in the TaskInfo object - testing OutputCommitCoordinator is done in OutputCommitCoordinatorSuite which is the only code path that relies on the TaskInfo's fields. Nevertheless we need the Task Info to not be null to allow that code path to not throw an NPE.
Before, a poison pill message was sent to the actor. That is not the paradigm that is used on other actors in Spark though, so making this more like the e.g. MapOutputTracker Actor.
@mccheah Thanks for picking this up. I'm going to close my PR and continue discussion on yours. |
override private[spark] def createSparkEnv( | ||
conf: SparkConf, | ||
isLocal: Boolean, | ||
listenerBus: LiveListenerBus): SparkEnv = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent these once more
…oopwriter-fix Conflicts: core/src/main/scala/org/apache/spark/SparkEnv.scala
LGTM, I will merge this once tests pass |
Test build #27254 has finished for PR 4066 at commit
|
retest this please |
Test build #27264 has finished for PR 4066 at commit
|
…eculative tasks Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is enabled sometimes this can result in multiple tasks committing their output to the same file. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well. This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see SPARK-4879. In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookkeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing. This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state. Future work includes more rigorous unit testing and extra optimizations should this patch cause a performance regression. It is unclear what the overall cost of communicating back to the driver on every hadoop-committing task will be. It's also important for those hitting this issue to backport this onto previous version of Spark because the bug has serious consequences, that is, data is lost. Currently, the OutputCommitCoordinator is only used when `spark.speculation` is true. It can be disabled by setting `spark.hadoop.outputCommitCoordination.enabled=false` in SparkConf. This patch is an updated version of #4155 (by mccheah), which in turn was an updated version of this PR. Closes #4155. Author: mcheah <[email protected]> Author: Josh Rosen <[email protected]> Closes #4066 from JoshRosen/SPARK-4879-sparkhadoopwriter-fix and squashes the following commits: 658116b [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix ed783b2 [Josh Rosen] Address Andrew’s feedback. e7be65a [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix 14861ea [Josh Rosen] splitID -> partitionID in a few places ed8b554 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix 48d5c1c [Josh Rosen] Roll back copiesRunning change in TaskSetManager 3969f5f [Josh Rosen] Re-enable guarding of commit coordination with spark.speculation setting. ede7590 [Josh Rosen] Add test to ensure that a job that denies all commits cannot complete successfully. 97da5fe [Josh Rosen] Use actor only for RPC; call methods directly in DAGScheduler. f582574 [Josh Rosen] Some cleanup in OutputCommitCoordinatorSuite a7c0e29 [Josh Rosen] Create fake TaskInfo using dummy fields instead of Mockito. 997b41b [Josh Rosen] Roll back unnecessary DAGSchedulerSingleThreadedProcessLoop refactoring: 459310a [Josh Rosen] Roll back TaskSetManager changes that broke other tests. dd00b7c [Josh Rosen] Move CommitDeniedException to executors package; remove `@DeveloperAPI` annotation. c79df98 [Josh Rosen] Some misc. code style + doc changes: f7d69c5 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix 92e6dc9 [Josh Rosen] Bug fix: use task ID instead of StageID to index into authorizedCommitters. b344bad [Josh Rosen] (Temporarily) re-enable “always coordinate” for testing purposes. 0aec91e [Josh Rosen] Only coordinate when speculation is enabled; add configuration option to bypass new coordination. 594e41a [mcheah] Fixing a scalastyle error 60a47f4 [mcheah] Writing proper unit test for OutputCommitCoordinator and fixing bugs. d63f63f [mcheah] Fixing compiler error 9fe6495 [mcheah] Fixing scalastyle 1df2a91 [mcheah] Throwing exception if SparkHadoopWriter commit denied d431144 [mcheah] Using more concurrency to process OutputCommitCoordinator requests. c334255 [mcheah] Properly handling messages that could be sent after actor shutdown. 8d5a091 [mcheah] Was mistakenly serializing the accumulator in test suite. 9c6a4fa [mcheah] More OutputCommitCoordinator cleanup on stop() 78eb1b5 [mcheah] Better OutputCommitCoordinatorActor stopping; simpler canCommit 83de900 [mcheah] Making the OutputCommitCoordinatorMessage serializable abc7db4 [mcheah] TaskInfo can't be null in DAGSchedulerSuite f135a8e [mcheah] Moving the output commit coordinator from class into method. 1c2b219 [mcheah] Renaming oudated names for test function classes 66a71cd [mcheah] Removing whitespace modifications 6b543ba [mcheah] Removing redundant accumulator in unit test c9decc6 [mcheah] Scalastyle fixes bc80770 [mcheah] Unit tests for OutputCommitCoordinator 6e6f748 [mcheah] [SPARK-4879] Use the Spark driver to authorize Hadoop commits. (cherry picked from commit 1cb3770) Signed-off-by: Andrew Or <[email protected]> Conflicts: core/src/main/scala/org/apache/spark/SparkEnv.scala
I only merged this into master and 1.3 for now because there are significant merge conflicts. Thanks @JoshRosen and @mccheah. |
…d Parquet tables This PR leverages the output commit coordinator introduced in #4066 to help committing Hive and Parquet tables. This PR extracts output commit code in `SparkHadoopWriter.commit` to `SparkHadoopMapRedUtil.commitTask`, and reuses it for committing Parquet and Hive tables on executor side. TODO - [ ] Add tests <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5139) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes #5139 from liancheng/spark-6369 and squashes the following commits: 72eb628 [Cheng Lian] Fixes typo in javadoc 9a4b82b [Cheng Lian] Adds javadoc and addresses @aarondav's comments dfdf3ef [Cheng Lian] Uses commit coordinator to help committing Hive and Parquet tables (cherry picked from commit fde6945) Signed-off-by: Cheng Lian <[email protected]>
…d Parquet tables This PR leverages the output commit coordinator introduced in #4066 to help committing Hive and Parquet tables. This PR extracts output commit code in `SparkHadoopWriter.commit` to `SparkHadoopMapRedUtil.commitTask`, and reuses it for committing Parquet and Hive tables on executor side. TODO - [ ] Add tests <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5139) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes #5139 from liancheng/spark-6369 and squashes the following commits: 72eb628 [Cheng Lian] Fixes typo in javadoc 9a4b82b [Cheng Lian] Adds javadoc and addresses @aarondav's comments dfdf3ef [Cheng Lian] Uses commit coordinator to help committing Hive and Parquet tables
…d Parquet tables This PR leverages the output commit coordinator introduced in apache#4066 to help committing Hive and Parquet tables. This PR extracts output commit code in `SparkHadoopWriter.commit` to `SparkHadoopMapRedUtil.commitTask`, and reuses it for committing Parquet and Hive tables on executor side. TODO - [ ] Add tests <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5139) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes apache#5139 from liancheng/spark-6369 and squashes the following commits: 72eb628 [Cheng Lian] Fixes typo in javadoc 9a4b82b [Cheng Lian] Adds javadoc and addresses @aarondav's comments dfdf3ef [Cheng Lian] Uses commit coordinator to help committing Hive and Parquet tables
SPARK-4879 also happened when use saveAsNewAPIHadoopFile. |
Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is enabled sometimes this can result in multiple tasks committing their output to the same file. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well.
This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see SPARK-4879.
In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookkeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing.
This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state.
Future work includes more rigorous unit testing and extra optimizations should this patch cause a performance regression. It is unclear what the overall cost of communicating back to the driver on every hadoop-committing task will be. It's also important for those hitting this issue to backport this onto previous version of Spark because the bug has serious consequences, that is, data is lost.
Currently, the OutputCommitCoordinator is only used when
spark.speculation
is true. It can be disabled by settingspark.hadoop.outputCommitCoordination.enabled=false
in SparkConf.This patch is an updated version of #4155 (by @mccheah), which in turn was an updated version of this PR.
Closes #4155.