-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support merge by row_id, row_addr #3254
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @chenkovsky. I would like to see a few improvements to the unit tests, and then this is ready to go.
rust/lance/src/dataset.rs
Outdated
let test_dir = tempdir().unwrap(); | ||
let test_uri = test_dir.path().to_str().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we aren't testing anything about the files, let's use an in-memory dataset instead.
let test_dir = tempdir().unwrap(); | |
let test_uri = test_dir.path().to_str().unwrap(); |
rust/lance/src/dataset.rs
Outdated
Dataset::write(data, test_uri, Some(write_params.clone())) | ||
.await | ||
.unwrap(); | ||
|
||
let mut dataset = Dataset::open(test_uri).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you re-use the dataset instance from write()
, you can just use an in-memory dataset:
Dataset::write(data, test_uri, Some(write_params.clone())) | |
.await | |
.unwrap(); | |
let mut dataset = Dataset::open(test_uri).await.unwrap(); | |
let dataset = Dataset::write(data, "memory://", Some(write_params.clone())) | |
.await | |
.unwrap(); |
rust/lance/src/dataset.rs
Outdated
let new_batch = | ||
RecordBatch::try_new(new_schema.clone(), vec![row_ids.clone(), row_ids.clone()]) | ||
.unwrap(); | ||
let new_data = RecordBatchIterator::new(vec![Ok(new_batch)], new_schema.clone()); | ||
dataset.merge(new_data, ROW_ID, "rowid").await.unwrap(); | ||
dataset.validate().await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like us to assert a few more things in this test:
dataset
has the expected final schemakey, value, new_value
.- The values are what we expect. For this, you should avoid using the same values in each column. Otherwise, the test could pass even if there is a bug that uses the wrong column's values. Right now, you use
row_ids.clone()
for bothrowid
andnew_value
. - This works even if you shuffle the data. I would recommend using take_record_batch() to reorder the
new_batch
so the row ids are out-of-order.
rust/lance/src/dataset.rs
Outdated
// This test also tests "null filling" when merging (e.g. when keys do not match | ||
// we need to insert nulls) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the null filling? It seems like you are providing every row id, unless I am missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the null filling? It seems like you are providing every row id, unless I am missing something.
sorry, I copy and modify another test
rust/lance/src/dataset.rs
Outdated
#[rstest] | ||
#[tokio::test] | ||
async fn test_merge_on_row_addr( | ||
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)] | ||
data_storage_version: LanceFileVersion, | ||
#[values(false, true)] use_stable_row_id: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments from the row id test apply here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #3254 +/- ##
==========================================
+ Coverage 78.47% 78.88% +0.41%
==========================================
Files 245 246 +1
Lines 85088 86568 +1480
Branches 85088 86568 +1480
==========================================
+ Hits 66772 68292 +1520
+ Misses 15501 15450 -51
- Partials 2815 2826 +11
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks better. Have a few suggestions to clean up the tests. Then I think this is ready to be merged.
rust/lance/src/dataset.rs
Outdated
let result = dataset | ||
.scan() | ||
.try_into_stream() | ||
.await | ||
.unwrap() | ||
.try_collect::<Vec<_>>() | ||
.await | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use .try_into_batch()
to immediately collect the results in one batch. Will need to remove the for loop below too.
let result = dataset | |
.scan() | |
.try_into_stream() | |
.await | |
.unwrap() | |
.try_collect::<Vec<_>>() | |
.await | |
.unwrap(); | |
let result = dataset | |
.scan() | |
.try_into_batch() | |
.await | |
.unwrap(); |
rust/lance/src/dataset.rs
Outdated
let key = batch | ||
.column_by_name("key") | ||
.unwrap() | ||
.as_any() | ||
.downcast_ref::<arrow_array::Int32Array>() | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For tests, you can use RecordBatch["<column>"]
to access a column by name. Also, .as_primitive()
is typically shorter. Both of these are mostly suitable for tests because they can panic.
let key = batch | |
.column_by_name("key") | |
.unwrap() | |
.as_any() | |
.downcast_ref::<arrow_array::Int32Array>() | |
.unwrap(); | |
let key = batch["key"].as_primitive::<Int32Type>(); |
No description provided.