-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add virtual ref support EAR-1183 #85
Conversation
02a13d9
to
9c00af0
Compare
icechunk/src/dataset.rs
Outdated
assert_eq!( | ||
ds.get_chunk(&new_array_path, &ChunkIndices(vec![0, 0, 0]), &range) | ||
.await?, | ||
Some(range.slice(bytes1.clone())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assuming seba did the math right ;)
icechunk/src/storage/object_store.rs
Outdated
match has_key { | ||
true => self.get_chunk_from_cached_store(&cache_key, &path, options).await, | ||
false => { | ||
let builder = match scheme { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
object_stores parse_from_url
does not pull from the env by default, so I do our own matching here.
Assuming that env credentials are fine for now...
* main: Bump the rust-dependencies group with 2 updates Better python creation, open, and storage API [EAR-1316] (#91) Add test checking only 1 concurrent commit can succeed Make local filesystem Storage implementation work Cache assets independently and by count, not by mem Disallow empty commits Delete dbg! call Expand S3 config, some nicer python init methods [EAR-1314][EAR-1311] (#84) Wrap empty and get methods in future_into_py add README
23613ce
to
f1e05c7
Compare
6c2e515
to
4f52f1d
Compare
icechunk/src/dataset.rs
Outdated
assert_eq!( | ||
ds.get_chunk(&new_array_path, &ChunkIndices(vec![0, 0, 1]), &ByteRange::ALL) | ||
.await?, | ||
Some(Bytes::copy_from_slice(&bytes2[1..6])), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lovely test
b4852c5
to
d21231e
Compare
6094696
to
d6241f4
Compare
) | ||
.await | ||
.map(|bytes| Some(ready(Ok(bytes)).boxed()))?) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this good at the type level but wrong semantically (which impacts on performance). The idea with this get_chunk_reader
design is that we want to hold the reference to Self
for as little as possible. We don't want to hold it all the time while the bytes are downloaded from S3. If we did that, nobody can write a new reference in the dataset, until we are not done downloading, which is a completely independent thing.
So, the way it works is, this function does the minimum it needs to do while holding the ref, and then returns a future that will do the actual work of downloading bytes when awaited. If you look above at the materialized case, we only resolve the chunk_ref and clone a pointer, that's enough to set up the Future
we return.
You should do something similar: clone your virtual resolver and "move" it into a Feature that you return without awaiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I was being dumb, sorry. Thanks for taking the time to write it out.
|
||
match Key::parse(key)? { | ||
Key::Metadata { .. } => Err(StoreError::NotAllowed(format!( | ||
"use .set to modify metadata for key {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏
@@ -511,6 +514,36 @@ impl Store { | |||
} | |||
} | |||
|
|||
// alternate API would take array path, and a mapping from string coord to ChunkPayload | |||
pub async fn set_virtual_ref( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, i wonder who would use this method, because zarr wont right? I'm confused about this: let's take @TomNicholas use case for inserting virtual refs. Which of these two alternatives would be easier for him?
store.set_virtual_ref("group/array/c/0/1/2", "s3://......")
# or
store.set_virtual_ref("/group/array", (0,1,2), "s3://.....")
Will virtual ref clients speak in the low level language of zarr keys? or in the higher level language of arrays and coordinates?
Of course, we can also offer both alternatives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will virtual ref clients speak in the low level language of zarr keys?
The virtualizarr ChunkManifest
objects store refs internally in numpy arrays, with the ref for each chunk accessible via indexing with the zarr chunk key. (So internally the zarr key tuple (0,1,2)
is used 3 times to index into 3 numpy arrays containing the path
, offset
and length
.)
Which of these two alternatives would be easier for him?
The second one is marginally easier, but either would be fine. The second one is easier because virtualizarr ManifestArray
/ChunkManifest
objects are unaware of their own variable name or group. The array name information is stored in the xarray.Dataset
object (as the key mapping to the xr.Variable
), and non-root groups haven't really been properly tried yet but would in general become node names in an xr.DataTree
(see zarr-developers/VirtualiZarr#84).
What I think I really want to be able to do is set an entire array's worth of virtual references with one call. Otherwise I'm just going to end up looping over the array elements anyway with something like:
def manifestarray_to_icechunk(group: str, arr_name: str, ma: ManifestArray, store: IcechunkStore) -> None:
# loop over every reference in the ChunkManifest for that array
for entry in np.nditer(
[ma.manifest._paths, ma.manifest._offsets, ma.manifest._lengths],
flags=['multi_index'],
):
# set each reference individually
store.set_virtual_ref(
f"{group}/{arr_name},
entry.index, # your (0,1,2) tuple
entry[0], # filepath for this element
entry[1], # offset for this element
entry[2], # length for this element
)
Presumably you can do that more efficiently / concurrently at the rust level than I can at the python level.
The only exception to that desire to write whole arrays at once might be the case of appending, discussed in #104 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@paraseba we chatted about this today and I thought we decided that iterating in python was fine for a first pass. So my mental sketch is:
loop in python
- pass reference (simple types) to python IcechunkStore
- IcechunkStore sends the reference to PyIcechunkStore
- Which processes it to the right types and sends it to Dataset
Will virtual ref clients speak in the low level language of zarr keys? or in the higher level language of arrays and coordinates?
I didn't think too hard about this since I assume this will get deleted in favor of crunching through the references in bulk by next week hehe. As Tom points out, we should just accept a NodePath
and three arrays (location
, offset
, length
) and iterate & parse them on the rust side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, definitely we can iterate on Rust to transfer a whole manifest of refs into icechunk. This function is only the first step.
* main: linter List operations fully realize results in memory Push sebas static list approach Update concurrency test to the latest rust api Sync main sync, test no longer runs Cleanup Use builtin tokio runtime Add a functional test for using a dataset with high concurrency Better timing and asserts Create a test that exercises the python store concurrently
No description provided.