Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support append commit #982

Closed
wants to merge 1 commit into from

Conversation

LiWeiJie
Copy link
Contributor

[rust] allow append commit in dataset
[python] fix the missing mode in the dataset commit interface

Copy link
Contributor

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we need better validation when we making these commits. We can't have duplicate fragment ids, because that breaks the unique row ids.

let schema = if object_store.exists(&latest_manifest).await? {
let dataset = Self::open(base_uri).await?;
version = dataset.version().version + 1;

if matches!(mode, WriteMode::Append) {
// Append mode: inherit indices from previous version.
indices = dataset.load_indices().await?;
dataset_fragments = dataset.fragments().iter().map(|f| f.clone()).collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create duplicate paths in the current write append code path. The old fragments are already added here:

lance/rust/src/dataset.rs

Lines 355 to 358 in d3e8153

let mut fragments: Vec<Fragment> = if matches!(params.mode, WriteMode::Append) {
dataset
.as_ref()
.map_or(vec![], |d| d.manifest.fragments.as_ref().clone())

This is a breaking change in behavior of a public API, so I think I'd rather not make this change here if we can avoid it. But if we do make this change, we need to adjust the other code path as well, and add tests to make sure we aren't creating duplicate fragment entries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, how are you generating fragment ids to make them unique? We might want to add validation to this function to make sure we aren't creating duplicate IDs.

@@ -603,7 +603,7 @@ def _commit(
base_uri = str(base_uri)
if not isinstance(new_schema, pa.Schema):
raise TypeError(f"schema must be pyarrow.Schema, got {type(new_schema)}")
_Dataset.commit(base_uri, new_schema, fragments)
_Dataset.commit(base_uri, new_schema, fragments, mode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this change.

Comment on lines +1151 to +1156
let fragments: Vec<Fragment> = dataset.fragments().iter().map(|f| f.clone()).collect();

let new_dataset =
Dataset::commit(test_uri, dataset.schema(), &fragments, WriteMode::Append)
.await
.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be allowed because the added fragments have the same ids as the old ones.

@wjones127
Copy link
Contributor

Perhaps what we want instead is a new struct to represent uncommitted fragments that doesn't have the ID

struct NewFragment {
    /// Files within the fragment.
    pub files: Vec<DataFile>,

    /// Optional file with deleted row ids.
    pub deletion_file: Option<DeletionFile>,
}

And then some function to append those to the log, adding new ids as appropriate:

impl Dataset {
    async fn append_new_fragments(dataset_uri: &str, new_fragments: &[NewFragment]) -> Result<Self> {
...}
}

That way we can handle the assignment of new fragment ids inside the function.

@eddyxu what do you think of that?

@wjones127
Copy link
Contributor

@LiWeiJie closing this since we've implemented support in _commit for Append and several other operations in #1193

You can see an example of append here:

table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"
lance.write_dataset(table, base_dir)
fragment = lance.fragment.LanceFragment.create(base_dir, table)
append = lance.LanceOperation.Append([fragment])
with pytest.raises(OSError):
# Must specify read version
dataset = lance.LanceDataset._commit(base_dir, append)
dataset = lance.LanceDataset._commit(base_dir, append, read_version=1)
tbl = dataset.to_table()
expected = pa.Table.from_pydict(
{
"a": list(range(100)) + list(range(100)),
"b": list(range(100)) + list(range(100)),
}
)
assert tbl == expected

@wjones127 wjones127 closed this Sep 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants