-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver idempotent. #16503
Conversation
@mccheah @JoshRosen @ash211 Could you please take look at this? |
@zsxwing @kayousterhout @andrewor14 Could you please help take a look at this ? |
I think this is another case where using |
Good catch. Looks good to me. @vanzin The RPC layer only guarantees at-most-once. Retry may be still helpful in some case, but the receiver should be idempotent. Either the current change or changing to use |
That was the case with akka (I think, not really sure), but the netty RPC layer doesn't drop messages. The new one is "exactly once". |
It doesn't drop but the connection may be broken. |
In which case the executor will die (see |
Yeah. Didn't recall that. Then I agree that using |
@zsxing, @vanzin |
You can make My point of not using If we can remove uses of
Yes it can timeout. You can retry it (basically doing what If you think about how the RPC layer works when you use
So it's a really expensive way of just doing |
dc188f4
to
06760ea
Compare
@vanzin |
How do you think about providing a "blocking" |
@@ -189,6 +188,13 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { | |||
assert( | |||
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3)) | |||
} | |||
|
|||
test("Authoried commiter get true if it calls canCommit again.") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reword test description to: Duplicate calls to canCommit from the authorized committer gets idempotent responses
@@ -221,6 +227,22 @@ private case class OutputCommitFunctions(tempDirPath: String) { | |||
if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter) | |||
} | |||
|
|||
// Receiver should be idempotent for AskPermissionToCommitOutput | |||
def callCanCommitMultiTimes(iter: Iterator[Int]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit rename to callCanCommitMultipleTimes
false | ||
// Coordinator should be idempotent when receiving AskPermissionToCommit. | ||
if (existingCommitter == attemptNumber) { | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please log a warning here before returning true
-- reaching this branch is likely indicative of network problems
logWarning(s"Authorizing duplicate request to commit for " +
s"attemptNumber=$attemptNumber to commit for stage=$stage, partition=$partition; " +
s"existingCommitter = $existingCommitter. This can indicate dropped network traffic.")
@ash211 |
You covered my concerns! I think this will fix some parts of this problem for sure, not sure if it covers every possible case though. |
ok to test |
Test build #71274 has finished for PR 16503 at commit
|
@jinxing64 can you please fix the failing Scala style tests? You can run them locally with |
6c25755
to
aba406d
Compare
@ash211 |
Test build #71284 has finished for PR 16503 at commit
|
Looks good. If you want to add a blocking version of |
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) | ||
val canCommit2 = SparkEnv.get.outputCommitCoordinator | ||
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) | ||
if(canCommit1 && canCommit2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after if
val canCommit2 = SparkEnv.get.outputCommitCoordinator | ||
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) | ||
if(canCommit1 && canCommit2) { | ||
Utils.createDirectory(tempDirPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need this? Can you just assert that you got permission both times (and rely on the exception causing the runJob
call to fail in the caller)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, using assert
is better here.
Making this idempotent looks great. I think there's a separate issue with this code still not handling poorly-timed preemption, but let's deal with that in a separate ticket / PR. Good work so far @jinxing64 ! |
aba406d
to
26c9a2f
Compare
Test build #71378 has finished for PR 16503 at commit
|
Test build #71377 has finished for PR 16503 at commit
|
Test build #71380 has finished for PR 16503 at commit
|
ping |
@vanzin |
55e4fd3
to
52af8c5
Compare
…eceiver idempotent.
52af8c5
to
69b412a
Compare
Test build #71560 has finished for PR 16503 at commit
|
Test build #71558 has finished for PR 16503 at commit
|
@vanzin |
Merging to master. |
…eceiver idempotent. ## What changes were proposed in this pull request? Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times. In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely. In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent. ## How was this patch tested? Added a new unit test to OutputCommitCoordinatorSuite. Author: jinxing <[email protected]> Closes apache#16503 from jinxing64/SPARK-18113.
… multiple times because of askWithRetry. ## What changes were proposed in this pull request? `ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times. **To reproduce**: 1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent. 2. Rebuild Spark and run following job: ``` def streamProcessing(): Unit = { val conf = new SparkConf() .setAppName("StreamingTest") .setMaster(masterUrl) val ssc = new StreamingContext(conf, Seconds(200)) val stream = ssc.socketTextStream("localhost", 1234) stream.print() ssc.start() ssc.awaitTermination() } ``` **To fix**: It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (apache#16503 (comment)). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it. ## How was this patch tested? Test manually. The scenario described above doesn't happen with this patch. Author: jinxing <[email protected]> Closes apache#16690 from jinxing64/SPARK-19347.
…eceiver idempotent. ## What changes were proposed in this pull request? Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times. In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely. In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent. ## How was this patch tested? Added a new unit test to OutputCommitCoordinatorSuite. Author: jinxing <[email protected]> Closes apache#16503 from jinxing64/SPARK-18113.
… multiple times because of askWithRetry. ## What changes were proposed in this pull request? `ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times. **To reproduce**: 1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent. 2. Rebuild Spark and run following job: ``` def streamProcessing(): Unit = { val conf = new SparkConf() .setAppName("StreamingTest") .setMaster(masterUrl) val ssc = new StreamingContext(conf, Seconds(200)) val stream = ssc.socketTextStream("localhost", 1234) stream.print() ssc.start() ssc.awaitTermination() } ``` **To fix**: It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (apache#16503 (comment)). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it. ## How was this patch tested? Test manually. The scenario described above doesn't happen with this patch. Author: jinxing <[email protected]> Closes apache#16690 from jinxing64/SPARK-19347.
## What changes were proposed in this pull request? `askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and apache#16690 (comment)) and `askWithRetry` is marked as deprecated. As mentioned SPARK-18113(apache#16503 (comment)): >askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it. Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`. ## How was this patch tested? This PR doesn't change code logic, existing unit test can cover. Author: jinxing <[email protected]> Closes apache#16790 from jinxing64/SPARK-19450.
## What changes were proposed in this pull request? `askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and apache#16690 (comment)) and `askWithRetry` is marked as deprecated. As mentioned SPARK-18113(apache#16503 (comment)): >askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it. Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`. ## How was this patch tested? This PR doesn't change code logic, existing unit test can cover. Author: jinxing <[email protected]> Closes apache#16790 from jinxing64/SPARK-19450.
…eceiver idempotent. ## What changes were proposed in this pull request? Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times. In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely. In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent. ## How was this patch tested? Added a new unit test to OutputCommitCoordinatorSuite. Author: jinxing <[email protected]> Closes apache#16503 from jinxing64/SPARK-18113.
…eceiver idempotent.@pr[apache#16503]
…eceiver idempotent. ## What changes were proposed in this pull request? Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times. In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely. In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent. ## How was this patch tested? Added a new unit test to OutputCommitCoordinatorSuite. Author: jinxing <[email protected]> Closes apache#16503 from jinxing64/SPARK-18113.
What changes were proposed in this pull request?
Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times.
In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely.
In this fix, use
ask
to replaceaskWithRetry
incanCommit
and make receiver idempotent.How was this patch tested?
Added a new unit test to OutputCommitCoordinatorSuite.