Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver idempotent. #16503

Closed
wants to merge 1 commit into from

Conversation

jinxing64
Copy link

@jinxing64 jinxing64 commented Jan 8, 2017

What changes were proposed in this pull request?

Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times.

In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely.

In this fix, use ask to replace askWithRetry in canCommit and make receiver idempotent.

How was this patch tested?

Added a new unit test to OutputCommitCoordinatorSuite.

@jinxing64 jinxing64 changed the title [SPARK-18113] Method canCommit should return the same value when call… [SPARK-18113] canCommit should return same when called by same attempt multi times. Jan 8, 2017
@jinxing64
Copy link
Author

@mccheah @JoshRosen @ash211 Could you please take look at this?

@jinxing64
Copy link
Author

@zsxwing @kayousterhout @andrewor14 Could you please help take a look at this ?

@vanzin
Copy link
Contributor

vanzin commented Jan 9, 2017

I think this is another case where using askWithRetry makes no sense given the guarantees of the RPC layer.

@zsxwing
Copy link
Member

zsxwing commented Jan 9, 2017

Good catch. Looks good to me.

@vanzin The RPC layer only guarantees at-most-once. Retry may be still helpful in some case, but the receiver should be idempotent. Either the current change or changing to use ask (Spark will retry the task) is good to me.

@vanzin
Copy link
Contributor

vanzin commented Jan 9, 2017

The RPC layer only guarantees at-most-once

That was the case with akka (I think, not really sure), but the netty RPC layer doesn't drop messages. The new one is "exactly once".

@zsxwing
Copy link
Member

zsxwing commented Jan 9, 2017

That was the case with akka (I think, not really sure), but the netty RPC layer doesn't drop messages. The new one is "exactly once".

It doesn't drop but the connection may be broken. askWithRetry will reconnect if the connection is broken.

@vanzin
Copy link
Contributor

vanzin commented Jan 9, 2017

It doesn't drop but the connection may be broken

In which case the executor will die (see CoarseGrainedExecutorBackend::onDisconnected).

@zsxwing
Copy link
Member

zsxwing commented Jan 9, 2017

In which case the executor will die (see CoarseGrainedExecutorBackend::onDisconnected).

Yeah. Didn't recall that. Then I agree that using ask is better.

@jinxing64
Copy link
Author

@zsxing, @vanzin
Maybe using ask in method canCommit is not suitable(i think). Because ask returns a Future, but it should be a blocking process to get result of AskPermissionToCommitOutput in canCommit. RPC layer doesn't drop message but message can be timeout. If timeout happens, we can retry or fail the task and let spark reschedule it, both of them are ok. But for some heavy tasks, retry is light weight (i think). So my suggestion is to keep the askWithRetry here and change the receiver to be idempotent. What do you think ?

@vanzin
Copy link
Contributor

vanzin commented Jan 10, 2017

You can make ask blocking by waiting for its future (e.g. with ThreadUtils.awaitResult).

My point of not using askWithRetry is that it's basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

If we can remove uses of askWithRetry as we find these issues, we can, at some point, finally get rid of the API altogether.

RPC layer doesn't drop message but message can be timeout.

Yes it can timeout. You can retry it (basically doing what askWithRetry does) but it should be such an edge case that failing the task should be ok.

If you think about how the RPC layer works when you use askWithRetry, this is what happens:

  • first RPC is sent
  • remote end is blocked on something, RPC is waiting in the queue
  • sender re-sends the RPC
  • lather, rinse, repeat
  • at some point, receive goes through the RPC queue and start responding to the RPCs
  • it responds to the first RPC above first, sender ignores the answer since RPC was timed out
  • lather, rinse, repeat
  • finally the last RPC is responded to and the sender sees the reply

So it's a really expensive way of just doing ask with a longer timeout.

@jinxing64
Copy link
Author

@vanzin
Thanks a lot for your comment. It's very helpful.
I'll change it to ask.
I think it make sense to keep receiver idempotent when handling AskPermissionToCommitOutput, even though we use ask to replace askWithRetry. So I didn't remove the code and unit test.

@jinxing64
Copy link
Author

If we can remove uses of askWithRetry as we find these issues, we can, at some point, finally get rid of the API altogether.

How do you think about providing a "blocking" ask in RpcEndpointRef? Just wait the future but no retry. Thus no need to wait the future outside of RpcEndpontRef. If you agree, I'll make another PR for changing. Currently I already found some places using a blocking ask, that's why I think it's needed to provid one in RpcEndpointRef.

@jinxing64 jinxing64 changed the title [SPARK-18113] canCommit should return same when called by same attempt multi times. [SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver idempotent. Jan 11, 2017
@jinxing64
Copy link
Author

ping @zsxwing @vanzin
Could you give another look at this please ?

@@ -189,6 +188,13 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
}

test("Authoried commiter get true if it calls canCommit again.") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reword test description to: Duplicate calls to canCommit from the authorized committer gets idempotent responses

@@ -221,6 +227,22 @@ private case class OutputCommitFunctions(tempDirPath: String) {
if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter)
}

// Receiver should be idempotent for AskPermissionToCommitOutput
def callCanCommitMultiTimes(iter: Iterator[Int]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit rename to callCanCommitMultipleTimes

false
// Coordinator should be idempotent when receiving AskPermissionToCommit.
if (existingCommitter == attemptNumber) {
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please log a warning here before returning true -- reaching this branch is likely indicative of network problems

            logWarning(s"Authorizing duplicate request to commit for " +
              s"attemptNumber=$attemptNumber to commit for stage=$stage, partition=$partition; " +
              s"existingCommitter = $existingCommitter. This can indicate dropped network traffic.")

@jinxing64
Copy link
Author

@ash211
Thank you so much for your comment. I've changed accordingly.
Could you please give another look?

@ash211
Copy link
Contributor

ash211 commented Jan 12, 2017

You covered my concerns!

I think this will fix some parts of this problem for sure, not sure if it covers every possible case though.

@vanzin
Copy link
Contributor

vanzin commented Jan 12, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Jan 12, 2017

Test build #71274 has finished for PR 16503 at commit 6c25755.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor

ash211 commented Jan 12, 2017

@jinxing64 can you please fix the failing Scala style tests? You can run them locally with ./dev/scalastyle

@jinxing64
Copy link
Author

@ash211
Thanks a lot for your comment. I've already fixed the failing Scala style tests. Running ./dev/scalastyle passed. Could you give another look?

@SparkQA
Copy link

SparkQA commented Jan 13, 2017

Test build #71284 has finished for PR 16503 at commit aba406d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jan 13, 2017

Looks good. If you want to add a blocking version of ask (and add a deprecation tag to askWithRetry) that would be fine too. But maybe a separate change.

@jinxing64
Copy link
Author

@vanzin @zsxwing
Thanks a lot for your comment. I will file another jira to add a blocking version of ask.
What else can I do for this pr : ) ?

.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
val canCommit2 = SparkEnv.get.outputCommitCoordinator
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
if(canCommit1 && canCommit2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after if

val canCommit2 = SparkEnv.get.outputCommitCoordinator
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
if(canCommit1 && canCommit2) {
Utils.createDirectory(tempDirPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this? Can you just assert that you got permission both times (and rely on the exception causing the runJob call to fail in the caller)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, using assert is better here.

@ash211
Copy link
Contributor

ash211 commented Jan 14, 2017

Making this idempotent looks great. I think there's a separate issue with this code still not handling poorly-timed preemption, but let's deal with that in a separate ticket / PR.

Good work so far @jinxing64 !

@SparkQA
Copy link

SparkQA commented Jan 14, 2017

Test build #71378 has finished for PR 16503 at commit eb5367a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2017

Test build #71377 has finished for PR 16503 at commit b867b92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2017

Test build #71380 has finished for PR 16503 at commit 55e4fd3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@vanzin @ash211
Thanks a lot for your comments; I've changed accordingly. Please give another look at this~~

@jinxing64
Copy link
Author

ping

@jinxing64
Copy link
Author

@vanzin
Thanks for your comments.I have changed the unit test. Could you take another look?

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71560 has finished for PR 16503 at commit 69b412a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71558 has finished for PR 16503 at commit 52af8c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@vanzin
Sorry for the stupid mistake I made. I've changed. Please take another look.

@vanzin
Copy link
Contributor

vanzin commented Jan 18, 2017

Merging to master.

@asfgit asfgit closed this in 33791a8 Jan 18, 2017
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…eceiver idempotent.

## What changes were proposed in this pull request?

Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times.

In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely.

In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent.

## How was this patch tested?

Added a new unit test to OutputCommitCoordinatorSuite.

Author: jinxing <[email protected]>

Closes apache#16503 from jinxing64/SPARK-18113.
actuaryzhang pushed a commit to actuaryzhang/spark that referenced this pull request Feb 1, 2017
… multiple times because of askWithRetry.

## What changes were proposed in this pull request?

`ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times.

**To reproduce**:

1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent.
2. Rebuild Spark and run following job:
```
  def streamProcessing(): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingTest")
      .setMaster(masterUrl)
    val ssc = new StreamingContext(conf, Seconds(200))
    val stream = ssc.socketTextStream("localhost", 1234)
    stream.print()
    ssc.start()
    ssc.awaitTermination()
  }
```
**To fix**:

It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (apache#16503 (comment)). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

## How was this patch tested?
Test manually. The scenario described above doesn't happen with this patch.

Author: jinxing <[email protected]>

Closes apache#16690 from jinxing64/SPARK-19347.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
…eceiver idempotent.

## What changes were proposed in this pull request?

Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times.

In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely.

In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent.

## How was this patch tested?

Added a new unit test to OutputCommitCoordinatorSuite.

Author: jinxing <[email protected]>

Closes apache#16503 from jinxing64/SPARK-18113.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
… multiple times because of askWithRetry.

## What changes were proposed in this pull request?

`ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times.

**To reproduce**:

1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent.
2. Rebuild Spark and run following job:
```
  def streamProcessing(): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingTest")
      .setMaster(masterUrl)
    val ssc = new StreamingContext(conf, Seconds(200))
    val stream = ssc.socketTextStream("localhost", 1234)
    stream.print()
    ssc.start()
    ssc.awaitTermination()
  }
```
**To fix**:

It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (apache#16503 (comment)). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

## How was this patch tested?
Test manually. The scenario described above doesn't happen with this patch.

Author: jinxing <[email protected]>

Closes apache#16690 from jinxing64/SPARK-19347.
ghost pushed a commit to dbtsai/spark that referenced this pull request Feb 19, 2017
## What changes were proposed in this pull request?

`askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and apache#16690 (comment)) and `askWithRetry` is marked as deprecated.
As mentioned SPARK-18113(apache#16503 (comment)):

>askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`.

## How was this patch tested?
This PR doesn't change code logic, existing unit test can cover.

Author: jinxing <[email protected]>

Closes apache#16790 from jinxing64/SPARK-19450.
Yunni pushed a commit to Yunni/spark that referenced this pull request Feb 27, 2017
## What changes were proposed in this pull request?

`askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and apache#16690 (comment)) and `askWithRetry` is marked as deprecated.
As mentioned SPARK-18113(apache#16503 (comment)):

>askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`.

## How was this patch tested?
This PR doesn't change code logic, existing unit test can cover.

Author: jinxing <[email protected]>

Closes apache#16790 from jinxing64/SPARK-19450.
yoonlee95 pushed a commit to yoonlee95/spark that referenced this pull request Aug 17, 2017
…eceiver idempotent.

## What changes were proposed in this pull request?

Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times.

In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely.

In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent.

## How was this patch tested?

Added a new unit test to OutputCommitCoordinatorSuite.

Author: jinxing <[email protected]>

Closes apache#16503 from jinxing64/SPARK-18113.
IceMimosa pushed a commit to IceMimosa/spark that referenced this pull request May 8, 2018
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
…eceiver idempotent.

## What changes were proposed in this pull request?

Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times.

In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely.

In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent.

## How was this patch tested?

Added a new unit test to OutputCommitCoordinatorSuite.

Author: jinxing <[email protected]>

Closes apache#16503 from jinxing64/SPARK-18113.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants