Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1838] Interrupt spark task should not report fetch failure #3070

Closed
wants to merge 4 commits into from

Conversation

FMX
Copy link
Contributor

@FMX FMX commented Jan 18, 2025

What changes were proposed in this pull request?
Do not trigger fetch failure if a spark task attempt is interrupted(speculation enabled).
Do not trigger fetch failure if the RPC of getReducerFileGroup is timeout.
This PR is intended for celeborn-0.5 branch.

Why are the changes needed?
Avoid unnecessary fetch failures and stage re-runs.

Does this PR introduce any user-facing change?
NO.

How was this patch tested?

  1. GA.
  2. Manually tested on cluster with spark speculation tasks.

Here is the test case

sc.parallelize(1 to 100, 100).flatMap(i => {
        (1 to 150000).iterator.map(num => num)
      }).groupBy(i => i, 100)
      .map(i => {
        if (i._1 < 5) {
          Thread.sleep(15000)
        }
        i
      })
      .repartition(400).count
截屏2025-01-18 16 16 16 截屏2025-01-18 16 16 22 截屏2025-01-18 16 19 15 截屏2025-01-18 16 17 27

@FMX FMX force-pushed the branch-0.5-b1838 branch 4 times, most recently from 42db5b4 to 1395d31 Compare January 18, 2025 10:02
@FMX FMX force-pushed the branch-0.5-b1838 branch 2 times, most recently from 0092684 to e0b3222 Compare January 20, 2025 08:42
Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

ce: Throwable): Unit = {
if (ce.getCause != null &&
(ce.getCause.isInstanceOf[InterruptedException] || ce.getCause.isInstanceOf[
TimeoutException])) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: Does the TimeoutException only happen in the speculation task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A timeout Exception might happen if there is something wrong with the driver or the executor.
If the executor has something wrong, it might encounter a timeout exception which we will want the task to retry itself.
If the driver has something wrong, throw fetch failure might not be able to save the situation.
So in the current implementation, we assume that all timeout exceptions should be treated as normal exceptions and let the task retry itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current target branch is barnch-0.5, should it be main?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the main branch has your PR, this problem does not exist in the main branch. So the PR won't be cherry-picked to the main branch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems #2921 can handle the InterruptedException, but can not cover the TimeoutException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, maybe another PR for the main to cover the timeout exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be fine to backport the whole patch to main branch.

Copy link

codecov bot commented Jan 21, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 41.22%. Comparing base (2c7847e) to head (db0108c).
Report is 9 commits behind head on branch-0.5.

Additional details and impacted files
@@              Coverage Diff               @@
##           branch-0.5    #3070      +/-   ##
==============================================
+ Coverage       41.16%   41.22%   +0.06%     
==============================================
  Files             226      226              
  Lines           14503    14503              
  Branches         1312     1312              
==============================================
+ Hits             5969     5977       +8     
+ Misses           8193     8184       -9     
- Partials          341      342       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

turboFei pushed a commit that referenced this pull request Jan 22, 2025
What changes were proposed in this pull request?
Do not trigger fetch failure if a spark task attempt is interrupted(speculation enabled).
Do not trigger fetch failure if the RPC of getReducerFileGroup is timeout.
This PR is intended for celeborn-0.5 branch.

Why are the changes needed?
Avoid unnecessary fetch failures and stage re-runs.

Does this PR introduce any user-facing change?
NO.

How was this patch tested?
1. GA.
2. Manually tested on cluster with spark speculation tasks.

Here is the test case
```scala
sc.parallelize(1 to 100, 100).flatMap(i => {
        (1 to 150000).iterator.map(num => num)
      }).groupBy(i => i, 100)
      .map(i => {
        if (i._1 < 5) {
          Thread.sleep(15000)
        }
        i
      })
      .repartition(400).count
```

<img width="1384" alt="截屏2025-01-18 16 16 16" src="https://github.com/user-attachments/assets/adf64857-5773-4081-a7d0-fa3439e751eb" />
<img width="1393" alt="截屏2025-01-18 16 16 22" src="https://github.com/user-attachments/assets/ac9bf172-1ab4-4669-a930-872d009f2530" />
<img width="1258" alt="截屏2025-01-18 16 19 15" src="https://github.com/user-attachments/assets/6a8ff3e1-c1fb-4ef2-84d8-b1fc6eb56fa6" />
<img width="892" alt="截屏2025-01-18 16 17 27" src="https://github.com/user-attachments/assets/f9de3841-f7d4-4445-99a3-873235d4abd0" />

Closes #3070 from FMX/branch-0.5-b1838.

Authored-by: mingji <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
@turboFei
Copy link
Member

thanks, merged to branch-0.5

@turboFei turboFei closed this Jan 22, 2025
turboFei pushed a commit to turboFei/incubator-celeborn that referenced this pull request Jan 22, 2025
What changes were proposed in this pull request?
Do not trigger fetch failure if a spark task attempt is interrupted(speculation enabled).
Do not trigger fetch failure if the RPC of getReducerFileGroup is timeout.
This PR is intended for celeborn-0.5 branch.

Why are the changes needed?
Avoid unnecessary fetch failures and stage re-runs.

Does this PR introduce any user-facing change?
NO.

How was this patch tested?
1. GA.
2. Manually tested on cluster with spark speculation tasks.

Here is the test case
```scala
sc.parallelize(1 to 100, 100).flatMap(i => {
        (1 to 150000).iterator.map(num => num)
      }).groupBy(i => i, 100)
      .map(i => {
        if (i._1 < 5) {
          Thread.sleep(15000)
        }
        i
      })
      .repartition(400).count
```

<img width="1384" alt="截屏2025-01-18 16 16 16" src="https://github.com/user-attachments/assets/adf64857-5773-4081-a7d0-fa3439e751eb" />
<img width="1393" alt="截屏2025-01-18 16 16 22" src="https://github.com/user-attachments/assets/ac9bf172-1ab4-4669-a930-872d009f2530" />
<img width="1258" alt="截屏2025-01-18 16 19 15" src="https://github.com/user-attachments/assets/6a8ff3e1-c1fb-4ef2-84d8-b1fc6eb56fa6" />
<img width="892" alt="截屏2025-01-18 16 17 27" src="https://github.com/user-attachments/assets/f9de3841-f7d4-4445-99a3-873235d4abd0" />

Closes apache#3070 from FMX/branch-0.5-b1838.

Authored-by: mingji <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
turboFei added a commit that referenced this pull request Jan 23, 2025
Backport #3070 to main branch.
## What changes were proposed in this pull request?
Do not trigger fetch failure if a spark task attempt is interrupted(speculation enabled). Do not trigger fetch failure if the RPC of getReducerFileGroup is timeout. This PR is intended for celeborn-0.5 branch.

## Why are the changes needed?
Avoid unnecessary fetch failures and stage re-runs.

## Does this PR introduce any user-facing change?
NO.

## How was this patch tested?
1. GA.
2. Manually tested on cluster with spark speculation tasks.

Here is the test case
```scala
sc.parallelize(1 to 100, 100).flatMap(i => {
        (1 to 150000).iterator.map(num => num)
      }).groupBy(i => i, 100)
      .map(i => {
        if (i._1 < 5) {
          Thread.sleep(15000)
        }
        i
      })
      .repartition(400).count
```

<img width="1384" alt="截屏2025-01-18 16 16 16" src="https://github.com/user-attachments/assets/adf64857-5773-4081-a7d0-fa3439e751eb" /> <img width="1393" alt="截屏2025-01-18 16 16 22" src="https://github.com/user-attachments/assets/ac9bf172-1ab4-4669-a930-872d009f2530" /> <img width="1258" alt="截屏2025-01-18 16 19 15" src="https://github.com/user-attachments/assets/6a8ff3e1-c1fb-4ef2-84d8-b1fc6eb56fa6" /> <img width="892" alt="截屏2025-01-18 16 17 27" src="https://github.com/user-attachments/assets/f9de3841-f7d4-4445-99a3-873235d4abd0" />

Closes #3070 from FMX/branch-0.5-b1838.

Authored-by: mingji <fengmingxiao.fmxalibaba-inc.com>

Closes #3080 from turboFei/b1838.

Lead-authored-by: mingji <[email protected]>
Co-authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants