Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow blob in write_fragments #3235

Merged
merged 7 commits into from
Jan 8, 2025
Merged

Conversation

fecet
Copy link
Contributor

@fecet fecet commented Dec 12, 2024

Allow user use write_fragments for lance with blob storage class by returning a nullable list of blob ops

@github-actions github-actions bot added enhancement New feature or request python labels Dec 12, 2024
@fecet fecet force-pushed the feat/frag-blob branch 2 times, most recently from 71c464c to e0784b9 Compare December 14, 2024 06:00
@fecet
Copy link
Contributor Author

fecet commented Dec 14, 2024

@westonpace Could you review it please? I'm wondering if the change in python/src/fragment.rs is valid? As my blob column in the balanced dataset become all None after I readed them by _take_rows.

@fecet
Copy link
Contributor Author

fecet commented Dec 15, 2024

image
I think the problem is it's not possible to modify id of schema on python side

@fecet fecet marked this pull request as ready for review December 15, 2024 10:16
Copy link
Contributor

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! Overall I think this looks like it is going in the right direction.

I have a few cleanup suggestions. Also, this will need a unit test before we can merge. test_commit_batch_append in test_dataset.py should be pretty close.

I think the problem is it's not possible to modify id of schema on python side

Shouldn't the id of the field remain the same? An append should only add new fragments, it should not modify the schema in any way.

If you can make a unit test (even if its a failing one) then I can debug further. At the moment it isn't clear to me exactly how you plan to use this so it is a bit hard to debug.

@@ -513,6 +513,67 @@ pub fn write_fragments(
.collect()
}

#[pyfunction(name = "_write_fragments_with_blobs")]
#[pyo3(signature = (dest, reader, **kwargs))]
pub fn write_fragments_with_blobs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we can't modify write_fragments above instead of introducing a new method?

It's ok for there to be breaking changes at the rust level because the public API is the write_fragments in fragment.py.

Comment on lines 287 to 315

let min_field_id = fragments.iter()
.flat_map(|fragment| &fragment.files)
.flat_map(|file| &file.fields)
.min()
.copied();
let new_schema = if let Some(min_id) = min_field_id {
let filtered_fields: Vec<Field> = schema.fields
.iter()
.filter(|f| f.id >= min_id)
.cloned()
.collect();

if filtered_fields.is_empty() {
return Err(PyValueError::new_err(format!(
"No fields in schema have field_id >= {}",
min_id
)));
}

Schema {
fields: filtered_fields,
metadata: schema.metadata.clone(),
}
} else {
schema
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are these changes related? Are you doing a overwrite operation?

Comment on lines 2225 to 2234
if isinstance(operation, Transaction):
new_ds = _Dataset.commit_transaction(
base_uri,
operation,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...being able to send a transaction in directly seems like it introduces more complexity to the user than needed. Is it possible to modify https://github.com/lancedb/lance/blob/main/python/python/lance/dataset.py#L2494 (the Append operation) so that it can take in a second list of blob fragments:

    @dataclass
    class Append(BaseOperation):
        fragments: Iterable[FragmentMetadata]
        blob_fragments: Iterable[FragmentMetadata] = []

@fecet
Copy link
Contributor Author

fecet commented Dec 17, 2024

I have found that the current implementation still has some issues, but I can first outline the difficulties I am currently encountering:

I am using the code below for testing, and it currently needs to run on the modified version.

import shutil


def make_table(offset, num_rows, big_val):
    end = offset + num_rows
    values = pa.array([big_val for _ in range(num_rows)], pa.large_binary())
    idx = pa.array(range(offset, end), pa.uint64())
    table = pa.record_batch(
        [idx, values],
        schema=pa.schema(
            [
                pa.field("idx", pa.uint64()),
                pa.field(
                    "blobs",
                    pa.large_binary(),
                    metadata={
                        "lance-schema:storage-class": "blob",
                    },
                ),
            ]
        ),
    )
    return table


tbl = make_table(0, 10, b"0" * 1024 * 1024)
uri = "test1"
shutil.rmtree(uri, ignore_errors=True)

default_frags, blob_frags = lance.fragment.write_fragments(
    pa.Table.from_batches([tbl]),
    uri,
    with_blobs=True,
    enable_move_stable_row_ids=True,
)

blob_operation = lance.LanceOperation.Overwrite(tbl.schema, blob_frags)
operation = lance.LanceOperation.Overwrite(tbl.schema, default_frags)
transaction = lance.Transaction(
    operation=operation,
    blobs_op=blob_operation,
    read_version=0,
)

ds = lance.LanceDataset.commit(
    uri,
    transaction,
    enable_v2_manifest_paths=True,
)
ds._take_rows(range(10))

The problem lies in the Rust implementation where the operation._to_inner method is called. Additionally, the two fragments returned by write_fragments retain the column index information. Therefore, the Overwrite operation expects to correctly preserve the index in the schema.

blob_operation = lance.LanceOperation.Overwrite(tbl.schema, blob_frags)
blob_operation._to_inner()

this will return correct schema

Overwrite { fragments: [Fragment { id: 0, files: [DataFile { path: "823dcbd1-175e-4307-b726-d9fbc775e887.lance", fields: [1], column_indices: [0], file_major_version: 2, file_minor_version: 0 }], deletion_file: None, row_id_meta: None, physical_rows: Some(10) }], schema: Schema { fields: [Field { name: "blobs", id: 1, parent_id: -1, logical_type: LogicalType("large_binary"), metadata: {"lance-schema:storage-class": "blob"}, encoding: Some(VarBinary), nullable: true, children: [], dictionary: None, storage_class: Blob }], metadata: {} }, config_upsert_values: None }

I confirm it's correct by

lance.write_dataset(tbl, "test2", tbl.schema)

lance.LanceDataset("test2/_blobs").lance_schema

The key of problem is the id in schema: Schema { fields: [Field { name: "blobs", id: 1, this cannot be done on the python side, as the id will be relabeled from zero to the length of schema,

pub fn set_field_id(&mut self, max_existing_id: Option<i32>) {
        let schema_max_id = self.max_field_id().unwrap_or(-1);
        let max_existing_id = max_existing_id.unwrap_or(-1);
        let mut current_id = schema_max_id.max(max_existing_id) + 1;
        self.fields
            .iter_mut()
            .for_each(|f| f.set_id(-1, &mut current_id));
    }
blob_operation = lance.LanceOperation.Overwrite(pa.schema(
            [
                pa.field(
                    "blobs",
                    pa.large_binary(),
                    metadata={
                        "lance-schema:storage-class": "blob",
                    },
                ),
            ]
        ), blob_frags)
blob_operation._to_inner()

will return schema: Schema { fields: [Field { name: "blobs", id: 0, that's why I do changes in https://github.com/lancedb/lance/pull/3235/files/56cda28424b3934f9ca32b1060e8f47d340639d2#diff-31492cb9a22ac40a820049c18a44ac88f651a5d0d7115fc09386899ab95f0c10. But I'm agreed it's not that correct, that could be easier if we can provide a way to directly return the transaction and we can extract inner schema from there.

@westonpace
Copy link
Contributor

So you are using Overwrite to create a new dataset? I agree that should work. Would it help if write_fragments also returned the schema? I think this is how we work around this problem in Fragment.merge_columns.

@fecet
Copy link
Contributor Author

fecet commented Dec 19, 2024

That should be a better way. So do you think we should change the definition in LanceOperation so that they can accept LanceSchame like merge?

schema: LanceSchema | pa.Schema

Append operation should be just ok but I haven't do any test

@wjones127
Copy link
Contributor

That should be a better way. So do you think we should change the definition in LanceOperation so that they can accept LanceSchame like merge?

That's actually being done right now in #3240

@fecet
Copy link
Contributor Author

fecet commented Dec 19, 2024

That's actually being done right now in #3240

Yes the translating between rust and python for transaction often trips me, will this be merged soon?

@fecet fecet closed this Dec 27, 2024
@fecet fecet reopened this Dec 27, 2024
@fecet fecet requested a review from westonpace December 27, 2024 14:21
@fecet
Copy link
Contributor Author

fecet commented Dec 27, 2024

@westonpace I rewrite the implement and add test for it, I think just return a transaction would be cleaner for end user as it can be convert to python easily.

@fecet fecet force-pushed the feat/frag-blob branch 2 times, most recently from e4a3bde to 66bb8d2 Compare December 28, 2024 09:00
@fecet
Copy link
Contributor Author

fecet commented Jan 5, 2025

Now pytest looks good on my end
image

Copy link
Contributor

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change I'm requesting is to modify the existing write_fragments instead of creating a second mirror write_fragments_transaction method. The other comments are minor.

Can you help me understand your motivations a bit? Why is this preferable to writing and committing all in one step (using LanceDataset::insert?)

@@ -679,6 +676,7 @@ def write_fragments(
data: ReaderLike,
dataset_uri: Union[str, Path, LanceDataset],
schema: Optional[pa.Schema] = None,
return_transaction: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this after the *?

transaction.blobs_op.new_schema, transaction.blobs_op.fragments
)

ds = lance.LanceDataset.commit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ds = lance.LanceDataset.commit(
lance.LanceDataset.commit(

Minor nit

@@ -388,6 +388,39 @@ pub fn write_fragments(
Ok(export_vec(reader.py(), &fragments))
}

#[pyfunction(name = "_write_fragments_transaction")]
#[pyo3(signature = (dest, reader, **kwargs))]
pub fn write_fragments_transaction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a second method here that mirrors the first (difficult to maintain) can we add return_transaction to the write_fragments method?

Comment on lines +63 to +74
transaction = lance.fragment.write_fragments(
tbl,
tmp_path / "ds",
enable_move_stable_row_ids=True,
return_transaction=True,
)
operation = lance.LanceOperation.Overwrite(
transaction.operation.new_schema, transaction.operation.fragments
)
blob_operation = lance.LanceOperation.Overwrite(
transaction.blobs_op.new_schema, transaction.blobs_op.fragments
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we get an overwrite transaction here? Isn't the default mode set to append?

Note: I'm not sure if this is related to the PR or just how things worked previously but it was masked by the fact we were ignoring the txn op. We can figure it out in a future PR.

transaction.blobs_op.new_schema, transaction.blobs_op.fragments
)

ds = lance.LanceDataset.commit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done in a future PR but maybe (now that we can serialize Transaction) we need a LanceDataset.commit_transaction that can just take in a transaction object instead of requiring us to pass in all the pieces.

@fecet
Copy link
Contributor Author

fecet commented Jan 7, 2025

I completely agree with modifying the existing method rather than introducing a new one. However, I'm not very familiar with Rust, and these two methods need to return different output types, so I'm not sure if it's safe to make such changes.

As for your second point, are you referring to the motivation behind this PR? My goal is to use write_fragment to handle all writes and then commit them in a single step to avoid potential conflicts. Or is there another issue with how I’ve implemented this as I’m still new to Lance, however I dont find any LanceDataset::insert here, did you mean LanceDataset::write?

@westonpace
Copy link
Contributor

However, I'm not very familiar with Rust, and these two methods need to return different output types, so I'm not sure if it's safe to make such changes.

Good point. We probably do need two methods but we can cut down on the repetition a bit. Let me make an attempt.

My goal is to use write_fragment to handle all writes and then commit them in a single step to avoid potential conflicts. Or is there another issue with how I’ve implemented this as I’m still new to Lance

Got it. This makes sense and is helpful. Yes, you will get fewer conflicts this way. Especially if you have concurrent writers.

however I dont find any LanceDataset::insert here, did you mean LanceDataset::write?

I was thinking of

but it's pretty much just a thin wrapper around write.

@westonpace
Copy link
Contributor

Ok, I created a do_write_fragments to house the code common to the two paths. If this passes tests we can merge.

@westonpace westonpace merged commit 397edeb into lancedb:main Jan 8, 2025
12 checks passed
@westonpace
Copy link
Contributor

Thanks for your help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request python
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants