Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4879] Use driver to coordinate Hadoop output committing for speculative tasks #4066

Closed

Conversation

JoshRosen
Copy link
Contributor

Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is enabled sometimes this can result in multiple tasks committing their output to the same file. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well.

This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see SPARK-4879.

In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookkeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing.

This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state.

Future work includes more rigorous unit testing and extra optimizations should this patch cause a performance regression. It is unclear what the overall cost of communicating back to the driver on every hadoop-committing task will be. It's also important for those hitting this issue to backport this onto previous version of Spark because the bug has serious consequences, that is, data is lost.

Currently, the OutputCommitCoordinator is only used when spark.speculation is true. It can be disabled by setting spark.hadoop.outputCommitCoordination.enabled=false in SparkConf.

This patch is an updated version of #4155 (by @mccheah), which in turn was an updated version of this PR.

Closes #4155.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25629 timed out for PR 4066 at commit c25c997 after a configured wait of 120m.

} else {
logInfo (s"$taID: Not committed because DAGScheduler did not authorize commit")
}

} catch {
case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need to catch TimeoutException here, too.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25640 has finished for PR 4066 at commit 8c64d12.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AskPermissionToCommitOutput(

cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
val canCommit: Boolean = AkkaUtils.askWithReply(
AskPermissionToCommitOutput(jobID, splitID, attemptID), dagSchedulerActor, askTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this for every task, not just speculated ones, will this bottleneck at the driver? Can we isolate the ask-for-commit logic to only tasks that have a copy via speculation? Not sure how I would architect that though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't know in advance which tasks might have speculated copies, so I think you could only enable/disable this depending on whether speculation was enabled rather than on a per-task or job basis. Note that this only affects ResultTasks that save output to Hadoop, such as saveAsTextFile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could send a message from the driver node when it determines a task should have a speculated copy, to the executor that was running the original copy of the task. The executors running the two versions of the task would each know that their commits are unsafe. It would be tricky to avoid a second race in this solution however.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether or not such an extra layer of complexity and coordination is necessary... it would be good before we merge this patch (or some version of it) to run a benchmarking suite against it. Even though this is only affecting hadoop-writing result tasks, it's on a critical path and we need a good understanding of the performance implications. Particularly when one saves large RDDs.

@JoshRosen
Copy link
Contributor Author

A large fraction of the test failures in DAGSchedulerSuite are because it passes null for CompletionEvent.taskInfo. I don't think that this field is actually nullable in real Spark jobs, so we should probably fix up those tests to include a mock TaskInfo objects in the events that they construct.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25679 timed out for PR 4066 at commit 63a7707 after a configured wait of 120m.

@mccheah
Copy link
Contributor

mccheah commented Jan 20, 2015

Instead of having every task require a call back to the driver or master, can the master broadcast to the executor that a task is being speculated and any executor with a copy of that task should do a commit verification? Just throwing the idea out there.

@JoshRosen
Copy link
Contributor Author

@mccheah I agree that we could add optimizations to skip this coordination in scenarios where we know that there will only be one copy of a task, but it might be a good idea to defer those optimizations to a separate patch if it turns out that the performance impact is small. Once we've got a version of this patch that passes tests, let's benchmark it and see how costly this extra coordination is.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor Author

(Just so we can see the test failure message again)

@mccheah
Copy link
Contributor

mccheah commented Jan 20, 2015

Did you think of any corner cases that you might have missed? In terms of correctness, this seems okay (although the Jenkins build indicates there are some issues). Have you thought about testing? I'm trying to brainstorm how to unit test this as well.

@JoshRosen
Copy link
Contributor Author

Hmm, let me think about this a bit...

I don't think that we have any end-to-end tests with task speculation right now, since Spark won't schedule speculated tasks on executors that are running on the same host. However, I don't think that end-to-end tests should be the only/best way to test this, since there are a few special cases that require us to have finer-grained control over the event interleaving.

Here are some of the scenarios that I'd like to test:

  • Two copies of a task are running and both try to commit at (essentially) the same time. In other words, needsTaskCommit should return true for both tasks, but only one should be allowed to commit.
  • The task that is authorized to commit crashes before committing. In this case, a new copy of the task should be scheduled and that task should be able to successfully commit its output (i.e. the "lock" held by the winning task should be released if that task dies without completing the commit process).

Since this involves an interplay between DAGScheduler logic, OutputCommitCoordinator, and actual tasks, we'll might have to use a mock TaskSchedulerBackend and mock task to test this.

@SparkQA
Copy link

SparkQA commented Jan 20, 2015

Test build #25834 has finished for PR 4066 at commit 63a7707.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • case class AskPermissionToCommitOutput(

@@ -105,10 +107,24 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def commit() {
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
val conf = SparkEnv.get.conf
val dagSchedulerActor =
AkkaUtils.makeDriverRef("DAGSchedulerSupervisor/DAGScheduler", conf, SparkEnv.get.actorSystem)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the tests that broke, did so because of "Name is not unique" errors. I don't know Akka nearly as well as I should, but why was this name added explicitly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here was modeled after how Executor -> Driver communication works for the HeartbeatReceiver actor.

On the driver, HeartbeatReceiver is started with an explicit name:

Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")

The executors use this name to construct a reference to the actor:

val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem)

Looking at the Akka docs:

The name parameter is optional, but you should preferably name your actors, since that is used in log messages and for identifying actors. The name must not be empty or start with $, but it may contain URL encoded characters (eg. %20 for a blank space). If the given name is already in use by another child to the same parent an InvalidActorNameException is thrown.

If you don't pass an explicit name, the actor is assigned an automatically-generated name (reference).

So, my hunch is that we're somehow creating multiple actors with the same name. I wonder if this is due to sharing between tests or improper cleanup, or whether there's something wrong with my use of explicit actor names here.

Previously, SparkHadoopWriter always committed its tasks without
question. The problem is that when speculation is turned on, sometimes
this can result in multiple tasks committing their output to the same
partition. Even though an HDFS-writing task may be re-launched due to
speculation, the original task is not killed and may eventually commit
as well.

This can cause strange race conditions where multiple tasks that commit
interfere with each other, with the result being that some partition
files are actually lost entirely. For more context on these kinds of
scenarios, see the aforementioned JIRA ticket.

In Hadoop MapReduce jobs, the application master is a central
coordinator that authorizes whether or not any given task can commit.
Before a task commits its output, it queries the application master as
to whether or not such a commit is safe, and the application master does
bookeeping as tasks are requesting commits. Duplicate tasks that would
write to files that were already written to from other tasks are
prohibited from committing.

This patch emulates that functionality - the crucial missing component was
a central arbitrator, which is now a module called the OutputCommitCoordinator.
The coordinator lives on the driver and the executors can obtain a reference
to this actor and request its permission to commit. As tasks commit and are
reported as completed successfully or unsuccessfully by the DAGScheduler,
the commit coordinator is informed of the task completion events as well
to update its internal state.
@mccheah
Copy link
Contributor

mccheah commented Jan 22, 2015

The linked pull request takes your ideas, makes them compatible with master, and adds unit tests. Feel free to take a look.

The SparkHadoopWriter wasn't serializable because the commit coordinator
was a class member. It doesn't need to be, so just get it on demand when
committing the task.
But we don't care about what's actually in the TaskInfo object - testing
OutputCommitCoordinator is done in OutputCommitCoordinatorSuite which is
the only code path that relies on the TaskInfo's fields. Nevertheless we
need the Task Info to not be null to allow that code path to not throw
an NPE.
Before, a poison pill message was sent to the actor. That is not the
paradigm that is used on other actors in Spark though, so making this
more like the e.g. MapOutputTracker Actor.
@JoshRosen
Copy link
Contributor Author

@mccheah Thanks for picking this up. I'm going to close my PR and continue discussion on yours.

override private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent these once more

…oopwriter-fix

Conflicts:
	core/src/main/scala/org/apache/spark/SparkEnv.scala
@andrewor14
Copy link
Contributor

LGTM, I will merge this once tests pass

@SparkQA
Copy link

SparkQA commented Feb 11, 2015

Test build #27254 has finished for PR 4066 at commit 658116b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Feb 11, 2015

Test build #27264 has finished for PR 4066 at commit 658116b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 1cb3770 Feb 11, 2015
asfgit pushed a commit that referenced this pull request Feb 11, 2015
…eculative tasks

Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is enabled sometimes this can result in multiple tasks committing their output to the same file. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well.

This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see SPARK-4879.

In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookkeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing.

This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state.

Future work includes more rigorous unit testing and extra optimizations should this patch cause a performance regression. It is unclear what the overall cost of communicating back to the driver on every hadoop-committing task will be. It's also important for those hitting this issue to backport this onto previous version of Spark because the bug has serious consequences, that is, data is lost.

Currently, the OutputCommitCoordinator is only used when `spark.speculation` is true.  It can be disabled by setting `spark.hadoop.outputCommitCoordination.enabled=false` in SparkConf.

This patch is an updated version of #4155 (by mccheah), which in turn was an updated version of this PR.

Closes #4155.

Author: mcheah <[email protected]>
Author: Josh Rosen <[email protected]>

Closes #4066 from JoshRosen/SPARK-4879-sparkhadoopwriter-fix and squashes the following commits:

658116b [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix
ed783b2 [Josh Rosen] Address Andrew’s feedback.
e7be65a [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix
14861ea [Josh Rosen] splitID -> partitionID in a few places
ed8b554 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix
48d5c1c [Josh Rosen] Roll back copiesRunning change in TaskSetManager
3969f5f [Josh Rosen] Re-enable guarding of commit coordination with spark.speculation setting.
ede7590 [Josh Rosen] Add test to ensure that a job that denies all commits cannot complete successfully.
97da5fe [Josh Rosen] Use actor only for RPC; call methods directly in DAGScheduler.
f582574 [Josh Rosen] Some cleanup in OutputCommitCoordinatorSuite
a7c0e29 [Josh Rosen] Create fake TaskInfo using dummy fields instead of Mockito.
997b41b [Josh Rosen] Roll back unnecessary DAGSchedulerSingleThreadedProcessLoop refactoring:
459310a [Josh Rosen] Roll back TaskSetManager changes that broke other tests.
dd00b7c [Josh Rosen] Move CommitDeniedException to executors package; remove `@DeveloperAPI` annotation.
c79df98 [Josh Rosen] Some misc. code style + doc changes:
f7d69c5 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix
92e6dc9 [Josh Rosen] Bug fix: use task ID instead of StageID to index into authorizedCommitters.
b344bad [Josh Rosen] (Temporarily) re-enable “always coordinate” for testing purposes.
0aec91e [Josh Rosen] Only coordinate when speculation is enabled; add configuration option to bypass new coordination.
594e41a [mcheah] Fixing a scalastyle error
60a47f4 [mcheah] Writing proper unit test for OutputCommitCoordinator and fixing bugs.
d63f63f [mcheah] Fixing compiler error
9fe6495 [mcheah] Fixing scalastyle
1df2a91 [mcheah] Throwing exception if SparkHadoopWriter commit denied
d431144 [mcheah] Using more concurrency to process OutputCommitCoordinator requests.
c334255 [mcheah] Properly handling messages that could be sent after actor shutdown.
8d5a091 [mcheah] Was mistakenly serializing the accumulator in test suite.
9c6a4fa [mcheah] More OutputCommitCoordinator cleanup on stop()
78eb1b5 [mcheah] Better OutputCommitCoordinatorActor stopping; simpler canCommit
83de900 [mcheah] Making the OutputCommitCoordinatorMessage serializable
abc7db4 [mcheah] TaskInfo can't be null in DAGSchedulerSuite
f135a8e [mcheah] Moving the output commit coordinator from class into method.
1c2b219 [mcheah] Renaming oudated names for test function classes
66a71cd [mcheah] Removing whitespace modifications
6b543ba [mcheah] Removing redundant accumulator in unit test
c9decc6 [mcheah] Scalastyle fixes
bc80770 [mcheah] Unit tests for OutputCommitCoordinator
6e6f748 [mcheah] [SPARK-4879] Use the Spark driver to authorize Hadoop commits.

(cherry picked from commit 1cb3770)
Signed-off-by: Andrew Or <[email protected]>

Conflicts:
	core/src/main/scala/org/apache/spark/SparkEnv.scala
@andrewor14
Copy link
Contributor

I only merged this into master and 1.3 for now because there are significant merge conflicts. Thanks @JoshRosen and @mccheah.

asfgit pushed a commit that referenced this pull request Mar 30, 2015
…d Parquet tables

This PR leverages the output commit coordinator introduced in #4066 to help committing Hive and Parquet tables.

This PR extracts output commit code in `SparkHadoopWriter.commit` to `SparkHadoopMapRedUtil.commitTask`, and reuses it for committing Parquet and Hive tables on executor side.

TODO

- [ ] Add tests

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5139)
<!-- Reviewable:end -->

Author: Cheng Lian <[email protected]>

Closes #5139 from liancheng/spark-6369 and squashes the following commits:

72eb628 [Cheng Lian] Fixes typo in javadoc
9a4b82b [Cheng Lian] Adds javadoc and addresses @aarondav's comments
dfdf3ef [Cheng Lian] Uses commit coordinator to help committing Hive and Parquet tables

(cherry picked from commit fde6945)
Signed-off-by: Cheng Lian <[email protected]>
asfgit pushed a commit that referenced this pull request Mar 30, 2015
…d Parquet tables

This PR leverages the output commit coordinator introduced in #4066 to help committing Hive and Parquet tables.

This PR extracts output commit code in `SparkHadoopWriter.commit` to `SparkHadoopMapRedUtil.commitTask`, and reuses it for committing Parquet and Hive tables on executor side.

TODO

- [ ] Add tests

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5139)
<!-- Reviewable:end -->

Author: Cheng Lian <[email protected]>

Closes #5139 from liancheng/spark-6369 and squashes the following commits:

72eb628 [Cheng Lian] Fixes typo in javadoc
9a4b82b [Cheng Lian] Adds javadoc and addresses @aarondav's comments
dfdf3ef [Cheng Lian] Uses commit coordinator to help committing Hive and Parquet tables
yu-iskw pushed a commit to yu-iskw/spark that referenced this pull request Apr 3, 2015
…d Parquet tables

This PR leverages the output commit coordinator introduced in apache#4066 to help committing Hive and Parquet tables.

This PR extracts output commit code in `SparkHadoopWriter.commit` to `SparkHadoopMapRedUtil.commitTask`, and reuses it for committing Parquet and Hive tables on executor side.

TODO

- [ ] Add tests

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5139)
<!-- Reviewable:end -->

Author: Cheng Lian <[email protected]>

Closes apache#5139 from liancheng/spark-6369 and squashes the following commits:

72eb628 [Cheng Lian] Fixes typo in javadoc
9a4b82b [Cheng Lian] Adds javadoc and addresses @aarondav's comments
dfdf3ef [Cheng Lian] Uses commit coordinator to help committing Hive and Parquet tables
@matrixlibing
Copy link

SPARK-4879 also happened when use saveAsNewAPIHadoopFile.
Why does not support the saveAsNewAPIHadoopFile function?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants