-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35290][SQL] Append new nested struct fields rather than sort for unionByName with null filling #32448
Conversation
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
Show resolved
Hide resolved
@@ -429,7 +429,7 @@ object ParquetFileFormat extends Logging { | |||
} | |||
|
|||
finalSchemas.reduceOption { (left, right) => | |||
try left.merge(right) catch { case e: Throwable => | |||
try left.merge(right, sparkSession.sessionState.conf.resolver) catch { case e: Throwable => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also don't know enough about this code to know what the impact is
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
Outdated
Show resolved
Hide resolved
cc @viirya FYI |
50e45d1
to
3c0d3d0
Compare
ok to test |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138240 has finished for PR 32448 at commit
|
Thanks for working on this! It looks like a better approach. Let me take a closer look in next few days. |
assert(unionDf.schema.toDDL == | ||
"`id` INT,`a` STRUCT<`a`: INT, `b`: BIGINT, " + | ||
"`nested`: STRUCT<`A`: INT, `a`: INT, `b`: BIGINT, `c`: STRING>>") | ||
"`nested`: STRUCT<`a`: INT, `c`: STRING, `A`: INT, `b`: BIGINT>>") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update migration guide (https://github.com/apache/spark/blob/master/docs/sql-migration-guide.md)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this an expected behavior change? and why do we prefer the new behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the nested fields don't necessarily have to be sorted. The behaviour should be same or at least similar with the outermost schema. Sorting is sort of unexpected I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You want a 3.1 -> 3.2 migration message saying fields are no longer sorted but instead kept in order with new fields added to the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following on to that, how is it determined when things are backported to previous releases (3.1 in this case) versus saved for the next minor release? Is that up to submitters or do maintainers make that call? There's a little bit of a "behavior change" here, though it's mostly a bug fix. Similar with #32338, where it's a bug fix that could be useful in a 3.1 patch release.
sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, with a few comments.
This approach looks more straightforward for adding missing fields. Previously I hadn't thought about this approach and took a more complicated one.
Let me further look at the test portion.
Sounds good, wanted to make sure things looked sane before cleaning things up. I'll try and get those comments finished up in the next day or so. |
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
Outdated
Show resolved
Hide resolved
Should I remove the findMissingFields function that was added for the original method? |
Yea, please remove it as it is useless now. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138481 has finished for PR 32448 at commit
|
GitHub Test failure looks due to #32533 |
#32533 is merged, can you rebase to get the fix? |
Kubernetes integration test status success |
418847a
to
7bca531
Compare
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #139743 has finished for PR 32448 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #139745 has finished for PR 32448 at commit
|
assert(StructType.findMissingFields(source4, schema, resolver) | ||
.exists(_.sameType(missing4))) | ||
assert(schema2.merge(schema1, resolver).sameType(StructType.fromDDL( | ||
"a2 STRING, a3 DOUBLE, nested STRUCT<b2: STRING, b3: DOUBLE, b1: INT>, a1 INT" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When schema2
merges schema1
, don't we keep its original case? E.g. "A2 STRING, a3 DOUBLE, nested STRUCT<B2: STRING, b3: DOUBLE, b1: INT>, a1 INT"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah it does, sameType
is just doing a case insensitive comparison so it didn't matter that my manual type was wrong. I'll update to ===
instead and that seems to fix the check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay, with one minor question.
@@ -181,7 +100,8 @@ object ResolveUnion extends Rule[LogicalPlan] { | |||
// like that. We will sort columns in the struct expression to make sure two sides of | |||
// union have consistent schema. | |||
aliased += foundAttr | |||
Alias(addFields(foundAttr, target), foundAttr.name)() | |||
val targetType = target.merge(source, conf.resolver) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, merge
will throw an exception if two schemas conflict. I recall that union of conflicting schemas doesn't fail in ResolveUnion
, but in CheckAnalysis
. Could we follow original behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm good question, I'm not sure exactly how that would work without adding extra logic to StructType.merge to ignore conflicts. And now that you bring that up I'm starting to think using StructType.merge isn't the best method since it does care about DataType. I just noticed it doesn't handle similar types, so you get errors if you try to merge a float and a double, whereas the normal union just handles that. I might try to rework this again to not use the StructType.merge after all...
So @viirya's comment made me realize that Separately, the So I can either just push the minor update I have here to address the comment, or I can close this and open two separate PRs to address each individually. |
Test build #139825 has finished for PR 32448 at commit
|
…e test for case insensitivity
4a14101
to
5762ddc
Compare
I pushed the small fix that moves the compatibility analysis after the union resolving, making ResolveUnion no longer use StructType.merge. I updated the title and description as well. Let me know if you want me to create separate PRs for each now instead |
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #139827 has finished for PR 32448 at commit
|
Any thoughts? I think it definitely makes sense to split this into two PRs now since there's two separate bugs being fixed. I could either backout all of the StructType.merge changes in this PR to only have the unionByName fix (that doesn't require StructType.merge anymore and I'm annoyed I didn't figure that out sooner), or close this PR and start fresh with two new PRs (and a new JIRA for the separate bug that the StructType.merge update fixes) |
I'm +1 to open 2 fresh PRs, thanks! |
Closing in favor of #33040 |
What changes were proposed in this pull request?
This PR changes the unionByName with null filling logic to append new nested struct fields from the right side of the union to the schema versus sorting fields alphabetically. It removes the need to use UpdateField expressions, and just directly projects new nested structs from each side of the union with the correct schema. This changes the union'd schema from being alphabetically sorted previously to now "left dominant", where the fields from the left side of the union are included and then the missing ones from the right are added in the same order found originally.
Also adds a resolver to the StructType merging to handle case insensitivity, as the resulting union logical and physical expressions using StructType.merge to provide the resulting schema of the union.
Why are the changes needed?
Certain nested structs would cause unionByName with null filling to error out due to part of the logic for rewriting the expression tree to sort the structs.
Does this PR introduce any user-facing change?
Shouldn't other than fixing certain cases that caused errors. I don't know if adding the resolver to the StructType merging has any unintended side effects, so definitely would like some thoughts on that. Also the order of the StructFields is slightly different now, though that shouldn't have too much of an effect.
How was this patch tested?
Updated existing tests based on the new StructField ordering and added a new test for the case that was broken originally.