-
Notifications
You must be signed in to change notification settings - Fork 433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add schema evolution to merge statement #3136
base: main
Are you sure you want to change the base?
Conversation
fcc92b2
to
7f1b955
Compare
@JustinRush80 can you rebase your branch against main, or allow us to rebase it I will do thorough review tomorrow then :) |
6daa9ec
to
0a45cf0
Compare
@JustinRush80 could you rebase again, something went wrong since files changed is huge |
0a45cf0
to
53042d8
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3136 +/- ##
==========================================
+ Coverage 71.73% 72.12% +0.38%
==========================================
Files 138 138
Lines 44362 45087 +725
Branches 44362 45087 +725
==========================================
+ Hits 31825 32520 +695
- Misses 10496 10504 +8
- Partials 2041 2063 +22 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for picking this up! Just a couple modifications required but looks good so far!
)?; | ||
let schema = Arc::new(schema_bulider.finish()); | ||
new_schema = Some(schema.clone()); | ||
if schema != snapshot.input_schema()? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might give false positives. A while ago I made merge_arrow_schema
, pass through the large/view types. But the input_schema()
will actually have small types.
I think we should do the not_eq comparison when it's a Delta Schema (StructType). Can you also add a test where merge_schema is True, where we write Large or View types to a table but without any new columns. Then the result shouldn't have a new schema action in the log history
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ion-elgreco is metrics.num_target_files_added the right field to see any new schema actions? or is there another way to see an added actions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You would want to read the commit file I think, something like this should work
let actions = crate::logstore::get_actions(version, self.read_commit_entry(version).await).await;
{ | ||
if target_schema.field_from_column(columns).is_err() { | ||
let new_fields = source_schema.field_with_unqualified_name(columns.name())?; | ||
ending_schema.push(new_fields.to_owned().with_nullable(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we insert the new fields in the schema, we should actually do some safety checks on the metadata. We cannot add new fields which has generated columns enabled by adding generated expressions, you can look in the recent PR of generated columns how this is prevented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like return a error message when the source data has a generated columns and the end user wants to add it via schema evolution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JustinRush80 exactly
.filter(|ops| matches!(ops.r#type, OperationType::Update | OperationType::Insert)) | ||
.flat_map(|ops| ops.operations.keys()) | ||
{ | ||
if target_schema.field_from_column(columns).is_err() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about schema evolution in nested types such as structs? I think field_from_column looks at top level fields, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe so but I will added a unit test and refactor if needed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this process works for struct!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also new fields within an existing struct?
Because above we only do this snapshot.schema().index_of(name).is_none()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ohh yea let me check that!
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
7c88080
to
04cae01
Compare
Signed-off-by: JustinRush80 <[email protected]>
// merge_arrow_schema is used to tell whether the two schema can be merge but we use the operation statement to pick new columns | ||
// this avoid the side effect of adding unnessary columns (eg. target.id = source.ID) "ID" will not be added since "id" exist in target and end user intended it to be "id" | ||
let mut new_schema = None; | ||
let mut schema_actions: Vec<Action> = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super small nit, but can we make this Option instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the schema_actions? that make senses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the schema action
@JustinRush80 this is shaping up really nice! |
Description
Add schema evolution (only merge) to the MERGE statement. New columns are added based on the columns predicates in the MERGE operations (eg. target.id = source.id). Using when_not_matched_insert_all and when_matched_update_all will add any new column to the target schema
Related Issue(s)
Documentation