-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support append commit #982
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we need better validation when we making these commits. We can't have duplicate fragment ids, because that breaks the unique row ids.
let schema = if object_store.exists(&latest_manifest).await? { | ||
let dataset = Self::open(base_uri).await?; | ||
version = dataset.version().version + 1; | ||
|
||
if matches!(mode, WriteMode::Append) { | ||
// Append mode: inherit indices from previous version. | ||
indices = dataset.load_indices().await?; | ||
dataset_fragments = dataset.fragments().iter().map(|f| f.clone()).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will create duplicate paths in the current write append code path. The old fragments are already added here:
Lines 355 to 358 in d3e8153
let mut fragments: Vec<Fragment> = if matches!(params.mode, WriteMode::Append) { | |
dataset | |
.as_ref() | |
.map_or(vec![], |d| d.manifest.fragments.as_ref().clone()) |
This is a breaking change in behavior of a public API, so I think I'd rather not make this change here if we can avoid it. But if we do make this change, we need to adjust the other code path as well, and add tests to make sure we aren't creating duplicate fragment entries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, how are you generating fragment ids to make them unique? We might want to add validation to this function to make sure we aren't creating duplicate IDs.
@@ -603,7 +603,7 @@ def _commit( | |||
base_uri = str(base_uri) | |||
if not isinstance(new_schema, pa.Schema): | |||
raise TypeError(f"schema must be pyarrow.Schema, got {type(new_schema)}") | |||
_Dataset.commit(base_uri, new_schema, fragments) | |||
_Dataset.commit(base_uri, new_schema, fragments, mode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to this change.
let fragments: Vec<Fragment> = dataset.fragments().iter().map(|f| f.clone()).collect(); | ||
|
||
let new_dataset = | ||
Dataset::commit(test_uri, dataset.schema(), &fragments, WriteMode::Append) | ||
.await | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be allowed because the added fragments have the same ids as the old ones.
Perhaps what we want instead is a new struct to represent uncommitted fragments that doesn't have the ID struct NewFragment {
/// Files within the fragment.
pub files: Vec<DataFile>,
/// Optional file with deleted row ids.
pub deletion_file: Option<DeletionFile>,
} And then some function to append those to the log, adding new ids as appropriate: impl Dataset {
async fn append_new_fragments(dataset_uri: &str, new_fragments: &[NewFragment]) -> Result<Self> {
...}
} That way we can handle the assignment of new fragment ids inside the function. @eddyxu what do you think of that? |
@LiWeiJie closing this since we've implemented support in You can see an example of append here: lance/python/python/tests/test_dataset.py Lines 399 to 421 in 2ee7f01
|
[rust] allow append commit in dataset
[python] fix the missing mode in the dataset commit interface