-
Notifications
You must be signed in to change notification settings - Fork 251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: allow blob in write_fragments
#3235
Conversation
71c464c
to
e0784b9
Compare
@westonpace Could you review it please? I'm wondering if the change in |
5c6da44
to
56cda28
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this! Overall I think this looks like it is going in the right direction.
I have a few cleanup suggestions. Also, this will need a unit test before we can merge. test_commit_batch_append
in test_dataset.py
should be pretty close.
I think the problem is it's not possible to modify id of schema on python side
Shouldn't the id of the field remain the same? An append should only add new fragments, it should not modify the schema in any way.
If you can make a unit test (even if its a failing one) then I can debug further. At the moment it isn't clear to me exactly how you plan to use this so it is a bit hard to debug.
python/src/fragment.rs
Outdated
@@ -513,6 +513,67 @@ pub fn write_fragments( | |||
.collect() | |||
} | |||
|
|||
#[pyfunction(name = "_write_fragments_with_blobs")] | |||
#[pyo3(signature = (dest, reader, **kwargs))] | |||
pub fn write_fragments_with_blobs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason we can't modify write_fragments
above instead of introducing a new method?
It's ok for there to be breaking changes at the rust level because the public API is the write_fragments
in fragment.py
.
python/src/dataset.rs
Outdated
|
||
let min_field_id = fragments.iter() | ||
.flat_map(|fragment| &fragment.files) | ||
.flat_map(|file| &file.fields) | ||
.min() | ||
.copied(); | ||
let new_schema = if let Some(min_id) = min_field_id { | ||
let filtered_fields: Vec<Field> = schema.fields | ||
.iter() | ||
.filter(|f| f.id >= min_id) | ||
.cloned() | ||
.collect(); | ||
|
||
if filtered_fields.is_empty() { | ||
return Err(PyValueError::new_err(format!( | ||
"No fields in schema have field_id >= {}", | ||
min_id | ||
))); | ||
} | ||
|
||
Schema { | ||
fields: filtered_fields, | ||
metadata: schema.metadata.clone(), | ||
} | ||
} else { | ||
schema | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are these changes related? Are you doing a overwrite operation?
python/python/lance/dataset.py
Outdated
if isinstance(operation, Transaction): | ||
new_ds = _Dataset.commit_transaction( | ||
base_uri, | ||
operation, | ||
commit_lock, | ||
storage_options=storage_options, | ||
enable_v2_manifest_paths=enable_v2_manifest_paths, | ||
detached=detached, | ||
max_retries=max_retries, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm...being able to send a transaction in directly seems like it introduces more complexity to the user than needed. Is it possible to modify https://github.com/lancedb/lance/blob/main/python/python/lance/dataset.py#L2494 (the Append
operation) so that it can take in a second list of blob fragments:
@dataclass
class Append(BaseOperation):
fragments: Iterable[FragmentMetadata]
blob_fragments: Iterable[FragmentMetadata] = []
I have found that the current implementation still has some issues, but I can first outline the difficulties I am currently encountering: I am using the code below for testing, and it currently needs to run on the modified version. import shutil
def make_table(offset, num_rows, big_val):
end = offset + num_rows
values = pa.array([big_val for _ in range(num_rows)], pa.large_binary())
idx = pa.array(range(offset, end), pa.uint64())
table = pa.record_batch(
[idx, values],
schema=pa.schema(
[
pa.field("idx", pa.uint64()),
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
]
),
)
return table
tbl = make_table(0, 10, b"0" * 1024 * 1024)
uri = "test1"
shutil.rmtree(uri, ignore_errors=True)
default_frags, blob_frags = lance.fragment.write_fragments(
pa.Table.from_batches([tbl]),
uri,
with_blobs=True,
enable_move_stable_row_ids=True,
)
blob_operation = lance.LanceOperation.Overwrite(tbl.schema, blob_frags)
operation = lance.LanceOperation.Overwrite(tbl.schema, default_frags)
transaction = lance.Transaction(
operation=operation,
blobs_op=blob_operation,
read_version=0,
)
ds = lance.LanceDataset.commit(
uri,
transaction,
enable_v2_manifest_paths=True,
)
ds._take_rows(range(10)) The problem lies in the Rust implementation where the operation._to_inner method is called. Additionally, the two fragments returned by write_fragments retain the column index information. Therefore, the Overwrite operation expects to correctly preserve the index in the schema. blob_operation = lance.LanceOperation.Overwrite(tbl.schema, blob_frags)
blob_operation._to_inner() this will return correct schema
I confirm it's correct by lance.write_dataset(tbl, "test2", tbl.schema)
lance.LanceDataset("test2/_blobs").lance_schema The key of problem is the id in pub fn set_field_id(&mut self, max_existing_id: Option<i32>) {
let schema_max_id = self.max_field_id().unwrap_or(-1);
let max_existing_id = max_existing_id.unwrap_or(-1);
let mut current_id = schema_max_id.max(max_existing_id) + 1;
self.fields
.iter_mut()
.for_each(|f| f.set_id(-1, &mut current_id));
} blob_operation = lance.LanceOperation.Overwrite(pa.schema(
[
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
]
), blob_frags)
blob_operation._to_inner() will return |
56cda28
to
eb0a770
Compare
So you are using |
That should be a better way. So do you think we should change the definition in schema: LanceSchema | pa.Schema Append operation should be just ok but I haven't do any test |
That's actually being done right now in #3240 |
Yes the translating between rust and python for |
eb0a770
to
16918f7
Compare
16918f7
to
bcb040e
Compare
@westonpace I rewrite the implement and add test for it, I think just return a |
e4a3bde
to
66bb8d2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main change I'm requesting is to modify the existing write_fragments
instead of creating a second mirror write_fragments_transaction
method. The other comments are minor.
Can you help me understand your motivations a bit? Why is this preferable to writing and committing all in one step (using LanceDataset::insert
?)
@@ -679,6 +676,7 @@ def write_fragments( | |||
data: ReaderLike, | |||
dataset_uri: Union[str, Path, LanceDataset], | |||
schema: Optional[pa.Schema] = None, | |||
return_transaction: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put this after the *
?
python/python/tests/test_balanced.py
Outdated
transaction.blobs_op.new_schema, transaction.blobs_op.fragments | ||
) | ||
|
||
ds = lance.LanceDataset.commit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ds = lance.LanceDataset.commit( | |
lance.LanceDataset.commit( |
Minor nit
@@ -388,6 +388,39 @@ pub fn write_fragments( | |||
Ok(export_vec(reader.py(), &fragments)) | |||
} | |||
|
|||
#[pyfunction(name = "_write_fragments_transaction")] | |||
#[pyo3(signature = (dest, reader, **kwargs))] | |||
pub fn write_fragments_transaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of creating a second method here that mirrors the first (difficult to maintain) can we add return_transaction
to the write_fragments
method?
transaction = lance.fragment.write_fragments( | ||
tbl, | ||
tmp_path / "ds", | ||
enable_move_stable_row_ids=True, | ||
return_transaction=True, | ||
) | ||
operation = lance.LanceOperation.Overwrite( | ||
transaction.operation.new_schema, transaction.operation.fragments | ||
) | ||
blob_operation = lance.LanceOperation.Overwrite( | ||
transaction.blobs_op.new_schema, transaction.blobs_op.fragments | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we get an overwrite transaction here? Isn't the default mode
set to append
?
Note: I'm not sure if this is related to the PR or just how things worked previously but it was masked by the fact we were ignoring the txn op. We can figure it out in a future PR.
python/python/tests/test_balanced.py
Outdated
transaction.blobs_op.new_schema, transaction.blobs_op.fragments | ||
) | ||
|
||
ds = lance.LanceDataset.commit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be done in a future PR but maybe (now that we can serialize Transaction
) we need a LanceDataset.commit_transaction
that can just take in a transaction object instead of requiring us to pass in all the pieces.
I completely agree with modifying the existing method rather than introducing a new one. However, I'm not very familiar with Rust, and these two methods need to return different output types, so I'm not sure if it's safe to make such changes. As for your second point, are you referring to the motivation behind this PR? My goal is to use write_fragment to handle all writes and then commit them in a single step to avoid potential conflicts. Or is there another issue with how I’ve implemented this as I’m still new to Lance, however I dont find any |
Good point. We probably do need two methods but we can cut down on the repetition a bit. Let me make an attempt.
Got it. This makes sense and is helpful. Yes, you will get fewer conflicts this way. Especially if you have concurrent writers.
I was thinking of lance/python/python/lance/dataset.py Line 1175 in a6aadaf
write .
|
Ok, I created a |
Thanks for your help! |
Allow user use
write_fragments
for lance with blob storage class by returning a nullable list of blob ops