-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWriter #21821
Conversation
@@ -254,7 +254,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |||
val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options) | |||
if (writer.isPresent) { | |||
runCommand(df.sparkSession, "save") { | |||
WriteToDataSourceV2(writer.get(), df.logicalPlan) | |||
WriteToDataSourceV2(writer.get(), df.planWithBarrier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is not needed but it is safe to have.
Test build #93305 has finished for PR 21821 at commit
|
Test build #93309 has finished for PR 21821 at commit
|
Test build #93319 has finished for PR 21821 at commit
|
retest this please |
Test build #93323 has finished for PR 21821 at commit
|
~~We need a separate rule to eliminate barriers for the write path and CTAS, since the input queries are not always children of these nodes. Thus, the current EliminateBarriers does not work. ~~ So far, all the relevant plans since 2.3 have extended |
shall we fix the non-idempotent analyzer rule for 2.3.2? |
@@ -891,8 +891,9 @@ object DDLUtils { | |||
* Throws exception if outputPath tries to overwrite inputpath. | |||
*/ | |||
def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = { | |||
val inputPaths = query.collect { | |||
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths | |||
val inputPaths = EliminateBarriers(query).collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AnalysisBarrier is a leaf node. That is one of the reasons why it could easily break the other code.
@cloud-fan This sounds good to me. @maryannxue Please fix the rule |
Test build #93416 has finished for PR 21821 at commit
|
Yes, @gatorsmile. Code is ready. Will post a PR shortly. |
cc @hvanhovell |
@gatorsmile do we still need this patch if maryann fixes this? |
@hvanhovell The question is whether |
LGTM. |
LGTM |
Test build #93553 has finished for PR 21821 at commit
|
Is this still valid since #21822 is going on? Shall we have this only on 2.3 main branches? |
@mgaido91 See the comment #21821 (comment) |
This PR is majorly for Spark 2.3 branch. The code changes will be removed from the master branch when #21822 is merged. However, the test cases are still valid. |
LGTM |
Test build #93555 has finished for PR 21821 at commit
|
```Scala val udf1 = udf({(x: Int, y: Int) => x + y}) val df = spark.range(0, 3).toDF("a") .withColumn("b", udf1($"a", udf1($"a", lit(10)))) df.cache() df.write.saveAsTable("t") ``` Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent. Added a test. Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869 Author: Xiao Li <[email protected]> Closes #21821 from gatorsmile/testMaster22. (cherry picked from commit d2e7deb) Signed-off-by: Xiao Li <[email protected]>
Thanks! Merged to master/2.3 |
What changes were proposed in this pull request?
Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent.
How was this patch tested?
Added a test.
Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869