-
Notifications
You must be signed in to change notification settings - Fork 433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add schema evolution to merge statement #3136
base: main
Are you sure you want to change the base?
Conversation
fcc92b2
to
7f1b955
Compare
@JustinRush80 can you rebase your branch against main, or allow us to rebase it I will do thorough review tomorrow then :) |
6daa9ec
to
0a45cf0
Compare
@JustinRush80 could you rebase again, something went wrong since files changed is huge |
0a45cf0
to
53042d8
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3136 +/- ##
==========================================
+ Coverage 71.73% 72.12% +0.38%
==========================================
Files 138 138
Lines 44362 45087 +725
Branches 44362 45087 +725
==========================================
+ Hits 31825 32520 +695
- Misses 10496 10504 +8
- Partials 2041 2063 +22 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for picking this up! Just a couple modifications required but looks good so far!
python/deltalake/table.py
Outdated
@@ -972,6 +972,7 @@ def merge( | |||
predicate: str, | |||
source_alias: Optional[str] = None, | |||
target_alias: Optional[str] = None, | |||
schema_mode: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this simply "merge_schema: bool = False", since we only have one mode :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see! For both python and rust apis or just the python api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For both
|
||
assert last_action["operation"] == "MERGE" | ||
assert result == expected | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an assert on the new schema from the DeltaTable, as sanity check in all the tests
)?; | ||
let schema = Arc::new(schema_bulider.finish()); | ||
new_schema = Some(schema.clone()); | ||
if schema != snapshot.input_schema()? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might give false positives. A while ago I made merge_arrow_schema
, pass through the large/view types. But the input_schema()
will actually have small types.
I think we should do the not_eq comparison when it's a Delta Schema (StructType). Can you also add a test where merge_schema is True, where we write Large or View types to a table but without any new columns. Then the result shouldn't have a new schema action in the log history
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ion-elgreco is metrics.num_target_files_added the right field to see any new schema actions? or is there another way to see an added actions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You would want to read the commit file I think, something like this should work
let actions = crate::logstore::get_actions(version, self.read_commit_entry(version).await).await;
{ | ||
if target_schema.field_from_column(columns).is_err() { | ||
let new_fields = source_schema.field_with_unqualified_name(columns.name())?; | ||
ending_schema.push(new_fields.to_owned().with_nullable(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we insert the new fields in the schema, we should actually do some safety checks on the metadata. We cannot add new fields which has generated columns enabled by adding generated expressions, you can look in the recent PR of generated columns how this is prevented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like return a error message when the source data has a generated columns and the end user wants to add it via schema evolution?
.filter(|ops| matches!(ops.r#type, OperationType::Update | OperationType::Insert)) | ||
.flat_map(|ops| ops.operations.keys()) | ||
{ | ||
if target_schema.field_from_column(columns).is_err() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about schema evolution in nested types such as structs? I think field_from_column looks at top level fields, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe so but I will added a unit test and refactor if needed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this process works for struct!
@@ -1714,6 +1829,183 @@ mod tests { | |||
|
|||
assert_merge(table, metrics).await; | |||
} | |||
#[tokio::test] | |||
async fn test_merge_with_schema_mode_no_change_of_schema() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one we can extend with checking it doesn't update schema if you use large/view arrow types in the source
.map(|c| Expr::Column(c.clone())) | ||
.collect_vec(), | ||
)? | ||
.select(select_columns)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need to keep track of null columns, I was doing some side improvements while working on streaming support for MERGE, you can see it here:
dead668#diff-12f59fe3c4440b7ae4ee1a5ac810b42c1d7357c246aae7b5770e840e52d3ec52R1218-R1230.
It essentially boils down to projecting early with the required metadata columns to filter down for cdf, and then we drop just these columns after the filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! I will take a look and refactor!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yesterday the changes in MERGE operation got merged, I have one more change lined up for streaming support, but I don't think that will affect you
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
Signed-off-by: JustinRush80 <[email protected]>
7c88080
to
04cae01
Compare
Signed-off-by: JustinRush80 <[email protected]>
Description
Add schema evolution (only merge) to the MERGE statement. New columns are added based on the columns predicates in the MERGE operations (eg. target.id = source.id). Using when_not_matched_insert_all and when_matched_update_all will add any new column to the target schema
Related Issue(s)
Documentation