Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43327] Trigger committer.setupJob before plan execute in FileFormatWriter#write #41000

Closed
wants to merge 2 commits into from

Conversation

zzzzming95
Copy link
Contributor

What changes were proposed in this pull request?

Trigger committer.setupJob before plan execute in FileFormatWriter#write

Why are the changes needed?

In this issue, the case where outputOrdering might not work if AQE is enabled has been resolved.

#38358

However, since it materializes the AQE plan in advance (triggers getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to not execute When AdaptiveSparkPlanExec#getFinalPhysicalPlan() is executed with an error.

Does this PR introduce any user-facing change?

no

How was this patch tested?

add UT

@github-actions github-actions bot added the SQL label Apr 30, 2023
@zzzzming95
Copy link
Contributor Author

@EnricoMi @cloud-fan @dongjoon-hyun can you take a look , thanks ~

@EnricoMi
Copy link
Contributor

What is the fallout of committer.setupJob(job) not being executed in presence of an error?


// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
// It must be run before `materializeAdaptiveSparkPlan()`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a similar comment above this line below would also be helpful:

val materializedPlan = materializeAdaptiveSparkPlan(empty2NullPlan)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the fallout of committer.setupJob(job) not being executed in presence of an error?

Spark will delete partition location when running insert overwrite .

#41000 (comment)

And it will create new location in committer.setupJob(job) , then execute the job. But in #38358 , we triggered the job execution in advance .

So when the job execute failed , the location path would be delete and no create .

@EnricoMi
Copy link
Contributor

I think Spark 3.2 is EOL, the final patch release was 3.2.4. a month ago. So this should target branch-3.3.

Note that a similar fix went into master and branch-3.4: #39431

@EnricoMi
Copy link
Contributor

Is this fixing #38358 (comment)?

@zzzzming95
Copy link
Contributor Author

Is this fixing #38358 (comment)?

yes

@dongjoon-hyun
Copy link
Member

Hi, @zzzzming95 .
According to Apache Spark versioning policy, Apache Spark 3.2 reached EOL already and 3.2.4 was the last one. As a result, we close all PRs against branch-3.2.

@zzzzming95
Copy link
Contributor Author

Hi, @zzzzming95 . According to Apache Spark versioning policy, Apache Spark 3.2 reached EOL already and 3.2.4 was the last one. As a result, we close all PRs against branch-3.2.

OK , I see a similar implementation for Spark3.3, and I will submit it to Spark3.3.

@dongjoon-hyun
Copy link
Member

@zzzzming95 . I'm not sure you are aware of Apache Spark backporting policy. To prevent a future regression, we start to review from master branch first. Then, backport it from master to branch-3.4 to branch-3.3.

OK , I see a similar implementation for Spark3.3, and I will submit it to Spark3.3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants