-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43327] Trigger committer.setupJob
before plan execute in FileFormatWriter#write
#41000
Conversation
@EnricoMi @cloud-fan @dongjoon-hyun can you take a look , thanks ~ |
What is the fallout of |
|
||
// This call shouldn't be put into the `try` block below because it only initializes and | ||
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. | ||
// It must be run before `materializeAdaptiveSparkPlan()` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a similar comment above this line below would also be helpful:
val materializedPlan = materializeAdaptiveSparkPlan(empty2NullPlan)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the fallout of committer.setupJob(job) not being executed in presence of an error?
Spark will delete partition location when running insert overwrite
.
And it will create new location in committer.setupJob(job)
, then execute the job. But in #38358 , we triggered the job execution in advance .
So when the job execute failed , the location path would be delete and no create .
I think Spark 3.2 is EOL, the final patch release was 3.2.4. a month ago. So this should target branch-3.3. Note that a similar fix went into master and branch-3.4: #39431 |
Is this fixing #38358 (comment)? |
yes |
Hi, @zzzzming95 . |
OK , I see a similar implementation for Spark3.3, and I will submit it to Spark3.3. |
@zzzzming95 . I'm not sure you are aware of Apache Spark backporting policy. To prevent a future regression, we start to review from
|
What changes were proposed in this pull request?
Trigger
committer.setupJob
before plan execute inFileFormatWriter#write
Why are the changes needed?
In this issue, the case where
outputOrdering
might not work if AQE is enabled has been resolved.#38358
However, since it materializes the AQE plan in advance (triggers getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to not execute When
AdaptiveSparkPlanExec#getFinalPhysicalPlan()
is executed with an error.Does this PR introduce any user-facing change?
no
How was this patch tested?
add UT