diff --git a/.github/workflows/rust-ci.yaml b/.github/workflows/rust-ci.yaml index e28ac46e..48843b67 100644 --- a/.github/workflows/rust-ci.yaml +++ b/.github/workflows/rust-ci.yaml @@ -81,6 +81,8 @@ jobs: env: AWS_ACCESS_KEY_ID: minio123 AWS_SECRET_ACCESS_KEY: minio123 + AWS_ALLOW_HTTP: 1 + AWS_ENDPOINT_URL: http://localhost:9000 AWS_DEFAULT_REGION: "us-east-1" run: | just pre-commit diff --git a/Cargo.lock b/Cargo.lock index f1546910..9dbf8d81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -661,6 +661,7 @@ dependencies = [ "test-strategy", "thiserror", "tokio", + "url", ] [[package]] diff --git a/Justfile b/Justfile index 1a0e65ee..ca2d6d75 100644 --- a/Justfile +++ b/Justfile @@ -3,7 +3,7 @@ alias pre := pre-commit # run all tests test *args='': - cargo test {{args}} + AWS_ALLOW_HTTP=1 AWS_ENDPOINT_URL=http://localhost:9000 AWS_ACCESS_KEY_ID=minio123 AWS_SECRET_ACCESS_KEY=minio123 cargo test {{args}} # compile but don't run all tests compile-tests *args='': diff --git a/README.md b/README.md index 0f55639d..1002903a 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,28 @@ s3_storage = Storage.s3_from_env(bucket="icechunk-test", prefix="oscar-demo-data store = await IcechunkStore.open(storage=storage, mode='r+') ``` +## Running Tests + +You will need [`docker compose`](https://docs.docker.com/compose/install/) and (optionally) [`just`](https://just.systems/). +Once those are installed, first switch to the icechunk root directory, then start up a local minio server: +``` +docker compose up -d +``` + +Use `just` to conveniently run a test +``` +just test +``` + +This is just an alias for + +``` +AWS_ALLOW_HTTP=1 AWS_ENDPOINT_URL=http://localhost:9000 AWS_ACCESS_KEY_ID=minio123 AWS_SECRET_ACCESS_KEY=minio123 cargo test +``` + +> [!TIP] +> For other aliases see [Justfile](./Justfile). + ## Snapshots, Branches, and Tags Every update to an Icechunk store creates a new **snapshot** with a unique ID. diff --git a/compose.yaml b/compose.yaml index ec263e71..6034aac4 100644 --- a/compose.yaml +++ b/compose.yaml @@ -3,7 +3,7 @@ volumes: services: minio: - container_name: minio + container_name: icechunk_minio image: quay.io/minio/minio entrypoint: | /bin/sh -c ' diff --git a/icechunk/Cargo.toml b/icechunk/Cargo.toml index 55175c8a..29a42f5d 100644 --- a/icechunk/Cargo.toml +++ b/icechunk/Cargo.toml @@ -25,6 +25,7 @@ base32 = "0.5.1" chrono = { version = "0.4.38", features = ["serde"] } async-recursion = "1.1.1" rmp-serde = "1.3.0" +url = "2.5.2" async-stream = "0.3.5" [dev-dependencies] diff --git a/icechunk/proptest-regressions/storage/virtual_ref.txt b/icechunk/proptest-regressions/storage/virtual_ref.txt new file mode 100644 index 00000000..0fef7ac8 --- /dev/null +++ b/icechunk/proptest-regressions/storage/virtual_ref.txt @@ -0,0 +1,8 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 008bfdd8f25d36650994a353bcb0664c03e6089f6e81f7a18340520d82758346 # shrinks to input = _TestConstructValidByteRangeArgs +cc 0bcff97f9e87d0f72d24e55229d8e6a18427e124755db10bc45e79cdbbac0af2 # shrinks to input = _TestPropertiesConstructValidByteRangeArgs { offset: 0, length: 1 } diff --git a/icechunk/src/dataset.rs b/icechunk/src/dataset.rs index c4854b82..f3f2146b 100644 --- a/icechunk/src/dataset.rs +++ b/icechunk/src/dataset.rs @@ -7,14 +7,21 @@ use std::{ sync::Arc, }; +use crate::{ + format::manifest::VirtualReferenceError, + storage::virtual_ref::{construct_valid_byte_range, VirtualChunkResolver}, +}; pub use crate::{ - format::{manifest::ChunkPayload, snapshot::ZarrArrayMetadata, ChunkIndices, Path}, + format::{ + manifest::{ChunkPayload, VirtualChunkLocation}, + snapshot::ZarrArrayMetadata, + ChunkIndices, Path, + }, metadata::{ ArrayShape, ChunkKeyEncoding, ChunkShape, Codec, DataType, DimensionName, DimensionNames, FillValue, StorageTransformer, UserAttributes, }, }; - use bytes::Bytes; use chrono::Utc; use futures::{future::ready, Future, FutureExt, Stream, StreamExt, TryStreamExt}; @@ -24,7 +31,9 @@ use tokio::task; use crate::{ format::{ - manifest::{ChunkInfo, ChunkRef, Manifest, ManifestExtents, ManifestRef}, + manifest::{ + ChunkInfo, ChunkRef, Manifest, ManifestExtents, ManifestRef, VirtualChunkRef, + }, snapshot::{ NodeData, NodeSnapshot, NodeType, Snapshot, SnapshotMetadata, SnapshotProperties, UserAttributesSnapshot, @@ -35,6 +44,7 @@ use crate::{ create_tag, fetch_branch_tip, fetch_tag, update_branch, BranchVersion, Ref, RefError, }, + storage::virtual_ref::ObjectStoreVirtualChunkResolver, MemCachingStorage, Storage, StorageError, }; @@ -55,13 +65,14 @@ impl Default for DatasetConfig { } } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct Dataset { config: DatasetConfig, storage: Arc, snapshot_id: ObjectId, last_node_id: Option, change_set: ChangeSet, + virtual_resolver: Arc, } #[derive(Clone, Debug, PartialEq, Default)] @@ -257,6 +268,8 @@ pub enum DatasetError { Conflict { expected_parent: Option, actual_parent: Option }, #[error("the dataset has been initialized already (default branch exists)")] AlreadyInitialized, + #[error("error when handling virtual reference {0}")] + VirtualReferenceError(#[from] VirtualReferenceError), } type DatasetResult = Result; @@ -343,6 +356,7 @@ impl Dataset { storage, last_node_id: None, change_set: ChangeSet::default(), + virtual_resolver: Arc::new(ObjectStoreVirtualChunkResolver::default()), } } @@ -680,8 +694,19 @@ impl Dataset { Some(ChunkPayload::Inline(bytes)) => { Ok(Some(ready(Ok(byte_range.slice(bytes))).boxed())) } - //FIXME: implement virtual fetch - Some(ChunkPayload::Virtual(_)) => todo!(), + Some(ChunkPayload::Virtual(VirtualChunkRef { location, offset, length })) => { + let byte_range = construct_valid_byte_range(byte_range, offset, length); + let resolver = Arc::clone(&self.virtual_resolver); + Ok(Some( + async move { + resolver + .fetch_chunk(&location, &byte_range) + .await + .map_err(|e| e.into()) + } + .boxed(), + )) + } None => Ok(None), } } diff --git a/icechunk/src/format/manifest.rs b/icechunk/src/format/manifest.rs index b7d79f7d..655bb45e 100644 --- a/icechunk/src/format/manifest.rs +++ b/icechunk/src/format/manifest.rs @@ -1,9 +1,14 @@ +use itertools::Itertools; use std::{collections::BTreeMap, ops::Bound, sync::Arc}; +use thiserror::Error; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use super::{ChunkIndices, Flags, IcechunkFormatError, IcechunkResult, NodeId, ObjectId}; +use super::{ + ChunkIndices, ChunkLength, ChunkOffset, Flags, IcechunkFormatError, IcechunkResult, + NodeId, ObjectId, +}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ManifestExtents(pub Vec); @@ -15,18 +20,66 @@ pub struct ManifestRef { pub extents: ManifestExtents, } +#[derive(Debug, Error)] +pub enum VirtualReferenceError { + #[error("error parsing virtual ref URL {0}")] + CannotParseUrl(#[from] url::ParseError), + #[error("virtual reference has no path segments {0}")] + NoPathSegments(String), + #[error("unsupported scheme for virtual chunk refs: {0}")] + UnsupportedScheme(String), + #[error("error parsing bucket name from virtual ref URL {0}")] + CannotParseBucketName(String), + #[error("error fetching virtual reference {0}")] + FetchError(Box), + #[error("error parsing virtual reference {0}")] + OtherError(#[from] Box), +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub enum VirtualChunkLocation { + Absolute(String), + // Relative(prefix_id, String) +} + +impl VirtualChunkLocation { + pub fn from_absolute_path( + path: &str, + ) -> Result { + // make sure we can parse the provided URL before creating the enum + // TODO: consider other validation here. + let url = url::Url::parse(path)?; + let new_path: String = url + .path_segments() + .ok_or(VirtualReferenceError::NoPathSegments(path.into()))? + // strip empty segments here, object_store cannot handle them. + .filter(|x| !x.is_empty()) + .join("/"); + + let host = url + .host() + .ok_or_else(|| VirtualReferenceError::CannotParseBucketName(path.into()))?; + Ok(VirtualChunkLocation::Absolute(format!( + "{}://{}/{}", + url.scheme(), + host, + new_path, + ))) + } +} + #[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct VirtualChunkRef { - location: String, // FIXME: better type - offset: u64, - length: u64, + pub location: VirtualChunkLocation, + pub offset: ChunkOffset, + pub length: ChunkLength, } #[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct ChunkRef { pub id: ObjectId, - pub offset: u64, - pub length: u64, + pub offset: ChunkOffset, + pub length: ChunkLength, } #[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] diff --git a/icechunk/src/format/mod.rs b/icechunk/src/format/mod.rs index 18c890cc..ab520e08 100644 --- a/icechunk/src/format/mod.rs +++ b/icechunk/src/format/mod.rs @@ -105,6 +105,10 @@ impl ByteRange { Self(Bound::Included(offset), Bound::Unbounded) } + pub fn from_offset_with_length(offset: ChunkOffset, length: ChunkOffset) -> Self { + Self(Bound::Included(offset), Bound::Excluded(offset + length)) + } + pub fn to_offset(offset: ChunkOffset) -> Self { Self(Bound::Unbounded, Bound::Excluded(offset)) } @@ -113,6 +117,18 @@ impl ByteRange { Self(Bound::Included(start), Bound::Excluded(end)) } + pub fn length(&self) -> Option { + match (self.0, self.1) { + (_, Bound::Unbounded) => None, + (Bound::Unbounded, Bound::Excluded(end)) => Some(end), + (Bound::Unbounded, Bound::Included(end)) => Some(end + 1), + (Bound::Included(start), Bound::Excluded(end)) => Some(end - start), + (Bound::Excluded(start), Bound::Included(end)) => Some(end - start), + (Bound::Included(start), Bound::Included(end)) => Some(end - start + 1), + (Bound::Excluded(start), Bound::Excluded(end)) => Some(end - start - 1), + } + } + pub const ALL: Self = Self(Bound::Unbounded, Bound::Unbounded); pub fn slice(&self, bytes: Bytes) -> Bytes { diff --git a/icechunk/src/storage/mod.rs b/icechunk/src/storage/mod.rs index 3ef67915..dfce7f8a 100644 --- a/icechunk/src/storage/mod.rs +++ b/icechunk/src/storage/mod.rs @@ -12,6 +12,7 @@ pub mod caching; pub mod logging; pub mod object_store; +pub mod virtual_ref; pub use caching::MemCachingStorage; pub use object_store::ObjectStorage; diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/object_store.rs index 738d3bba..7e9985d0 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/object_store.rs @@ -1,20 +1,18 @@ -use core::fmt; -use std::{ - fs::create_dir_all, future::ready, ops::Bound, path::Path as StdPath, sync::Arc, +use crate::format::{ + attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, ByteRange, + ObjectId, }; - use async_trait::async_trait; use bytes::Bytes; +use core::fmt; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use object_store::{ local::LocalFileSystem, memory::InMemory, path::Path as ObjectPath, GetOptions, GetRange, ObjectStore, PutMode, PutOptions, PutPayload, }; use serde::{Deserialize, Serialize}; - -use crate::format::{ - attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, ByteRange, - ObjectId, +use std::{ + fs::create_dir_all, future::ready, ops::Bound, path::Path as StdPath, sync::Arc, }; use super::{Storage, StorageError, StorageResult}; diff --git a/icechunk/src/storage/virtual_ref.rs b/icechunk/src/storage/virtual_ref.rs new file mode 100644 index 00000000..4e5f95a8 --- /dev/null +++ b/icechunk/src/storage/virtual_ref.rs @@ -0,0 +1,203 @@ +use crate::format::manifest::{VirtualChunkLocation, VirtualReferenceError}; +use crate::format::ByteRange; +use async_trait::async_trait; +use bytes::Bytes; +use object_store::{ + aws::AmazonS3Builder, path::Path as ObjectPath, GetOptions, GetRange, ObjectStore, +}; +use std::cmp::{max, min}; +use std::collections::HashMap; +use std::fmt::Debug; +use std::ops::Bound; +use std::sync::Arc; +use tokio::sync::RwLock; +use url; + +#[async_trait] +pub trait VirtualChunkResolver: Debug { + async fn fetch_chunk( + &self, + location: &VirtualChunkLocation, + range: &ByteRange, + ) -> Result; +} + +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +struct StoreCacheKey(String, String); + +#[derive(Debug, Default)] +pub struct ObjectStoreVirtualChunkResolver { + stores: RwLock>>, +} + +// Converts the requested ByteRange to a valid ByteRange appropriate +// to the chunk reference of known `offset` and `length`. +pub fn construct_valid_byte_range( + request: &ByteRange, + chunk_offset: u64, + chunk_length: u64, +) -> ByteRange { + // TODO: error for offset<0 + // TODO: error if request.start > offset + length + // FIXME: we allow creating a ByteRange(start, end) where end < start + let new_offset = match request.0 { + Bound::Unbounded => chunk_offset, + Bound::Included(start) => max(start, 0) + chunk_offset, + Bound::Excluded(start) => max(start, 0) + chunk_offset + 1, + }; + request.length().map_or( + ByteRange( + Bound::Included(new_offset), + Bound::Excluded(chunk_offset + chunk_length), + ), + |reqlen| { + ByteRange( + Bound::Included(new_offset), + // no request can go past offset + length, so clamp it + Bound::Excluded(min(new_offset + reqlen, chunk_offset + chunk_length)), + ) + }, + ) +} + +#[async_trait] +impl VirtualChunkResolver for ObjectStoreVirtualChunkResolver { + async fn fetch_chunk( + &self, + location: &VirtualChunkLocation, + range: &ByteRange, + ) -> Result { + let VirtualChunkLocation::Absolute(location) = location; + let parsed = + url::Url::parse(location).map_err(VirtualReferenceError::CannotParseUrl)?; + let bucket_name = parsed + .host_str() + .ok_or(VirtualReferenceError::CannotParseBucketName( + "error parsing bucket name".into(), + ))? + .to_string(); + let path = ObjectPath::parse(parsed.path()) + .map_err(|e| VirtualReferenceError::OtherError(Box::new(e)))?; + let scheme = parsed.scheme(); + let cache_key = StoreCacheKey(scheme.into(), bucket_name); + + let options = + GetOptions { range: Option::::from(range), ..Default::default() }; + let store = { + let stores = self.stores.read().await; + #[allow(clippy::expect_used)] + stores.get(&cache_key).map(Arc::clone) + }; + let store = match store { + Some(store) => store, + None => { + let builder = match scheme { + // FIXME: allow configuring auth for virtual references + "s3" => AmazonS3Builder::from_env(), + _ => { + Err(VirtualReferenceError::UnsupportedScheme(scheme.to_string()))? + } + }; + let new_store: Arc = Arc::new( + builder + .with_bucket_name(&cache_key.1) + .build() + .map_err(|e| VirtualReferenceError::FetchError(Box::new(e)))?, + ); + { + self.stores + .write() + .await + .insert(cache_key.clone(), Arc::clone(&new_store)); + } + new_store + } + }; + Ok(store + .get_opts(&path, options) + .await + .map_err(|e| VirtualReferenceError::FetchError(Box::new(e)))? + .bytes() + .await + .map_err(|e| VirtualReferenceError::FetchError(Box::new(e)))?) + } +} + +#[cfg(test)] +mod tests { + use proptest::prop_assert_eq; + use test_strategy::proptest; + + use super::*; + + #[test] + fn test_virtual_chunk_location_bad() { + // errors relative chunk location + assert!(matches!( + VirtualChunkLocation::from_absolute_path("abcdef"), + Err(VirtualReferenceError::CannotParseUrl(_)), + )); + // extra / prevents bucket name detection + assert!(matches!( + VirtualChunkLocation::from_absolute_path("s3:///foo/path"), + Err(VirtualReferenceError::CannotParseBucketName(_)), + )); + } + + #[proptest] + fn test_properties_construct_valid_byte_range( + #[strategy(0..10u64)] offset: u64, + // TODO: generate valid offsets using offset, length as input + #[strategy(3..100u64)] length: u64, + #[strategy(0..=2u64)] request_offset: u64, + ) { + // no request can go past this + let max_end = offset + length; + + // TODO: more property tests: + // inputs: (1) chunk_ref: offset, length + // (2) requested_range + // properties: output.length() <= actual_range.length() + // output.length() == requested.length() + // output.0 >= chunk_ref.offset + prop_assert_eq!( + construct_valid_byte_range( + &ByteRange(Bound::Included(0), Bound::Excluded(length)), + offset, + length, + ), + ByteRange(Bound::Included(offset), Bound::Excluded(max_end)) + ); + prop_assert_eq!( + construct_valid_byte_range( + &ByteRange(Bound::Unbounded, Bound::Excluded(length)), + offset, + length + ), + ByteRange(Bound::Included(offset), Bound::Excluded(max_end)) + ); + prop_assert_eq!( + construct_valid_byte_range( + &ByteRange(Bound::Included(request_offset), Bound::Excluded(max_end)), + offset, + length + ), + ByteRange(Bound::Included(request_offset + offset), Bound::Excluded(max_end)) + ); + prop_assert_eq!( + construct_valid_byte_range(&ByteRange::ALL, offset, length), + ByteRange(Bound::Included(offset), Bound::Excluded(max_end)) + ); + prop_assert_eq!( + construct_valid_byte_range( + &ByteRange(Bound::Excluded(request_offset), Bound::Unbounded), + offset, + length + ), + ByteRange( + Bound::Included(offset + request_offset + 1), + Bound::Excluded(max_end) + ) + ); + } +} diff --git a/icechunk/src/strategies.rs b/icechunk/src/strategies.rs index a1ed3f63..bcb4b385 100644 --- a/icechunk/src/strategies.rs +++ b/icechunk/src/strategies.rs @@ -18,19 +18,23 @@ pub fn node_paths() -> impl Strategy { any::() } -#[allow(clippy::expect_used)] -pub fn empty_datasets() -> impl Strategy { +prop_compose! { + #[allow(clippy::expect_used)] + pub fn empty_datasets()(_id in any::()) -> Dataset { + // _id is used as a hack to avoid using prop_oneof![Just(dataset)] + // Using Just requires Dataset impl Clone, which we do not want + // FIXME: add storages strategy let storage = ObjectStorage::new_in_memory_store(None); let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"); - let dataset = runtime.block_on(async { + runtime.block_on(async { Dataset::init(Arc::new(storage), false) .await .expect("Failed to initialize dataset") .build() - }); - prop_oneof![Just(dataset)] + }) +} } pub fn codecs() -> impl Strategy> { diff --git a/icechunk/src/zarr.rs b/icechunk/src/zarr.rs index f3f35b58..00574309 100644 --- a/icechunk/src/zarr.rs +++ b/icechunk/src/zarr.rs @@ -21,11 +21,12 @@ use tokio::sync::RwLock; use crate::{ dataset::{ - get_chunk, ArrayShape, ChunkIndices, ChunkKeyEncoding, ChunkShape, Codec, - DataType, DatasetError, DimensionNames, FillValue, Path, StorageTransformer, - UserAttributes, ZarrArrayMetadata, + get_chunk, ArrayShape, ChunkIndices, ChunkKeyEncoding, ChunkPayload, ChunkShape, + Codec, DataType, DatasetError, DimensionNames, FillValue, Path, + StorageTransformer, UserAttributes, ZarrArrayMetadata, }, format::{ + manifest::VirtualChunkRef, snapshot::{NodeData, UserAttributesSnapshot}, ByteRange, ChunkOffset, IcechunkFormatError, }, @@ -195,6 +196,8 @@ pub enum KeyNotFoundError { pub enum StoreError { #[error("invalid zarr key format `{key}`")] InvalidKey { key: String }, + #[error("this operation is not allowed: {0}")] + NotAllowed(String), #[error("object not found: `{0}`")] NotFound(#[from] KeyNotFoundError), #[error("unsuccessful dataset operation: `{0}`")] @@ -511,6 +514,36 @@ impl Store { } } + // alternate API would take array path, and a mapping from string coord to ChunkPayload + pub async fn set_virtual_ref( + &mut self, + key: &str, + reference: VirtualChunkRef, + ) -> StoreResult<()> { + if self.mode == AccessMode::ReadOnly { + return Err(StoreError::ReadOnly); + } + + match Key::parse(key)? { + Key::Metadata { .. } => Err(StoreError::NotAllowed(format!( + "use .set to modify metadata for key {}", + key + ))), + Key::Chunk { node_path, coords } => { + self.dataset + .write() + .await + .set_chunk_ref( + node_path, + coords, + Some(ChunkPayload::Virtual(reference)), + ) + .await?; + Ok(()) + } + } + } + pub async fn delete(&self, key: &str) -> StoreResult<()> { if self.mode == AccessMode::ReadOnly { return Err(StoreError::ReadOnly); diff --git a/icechunk/tests/test_virtual_refs.rs b/icechunk/tests/test_virtual_refs.rs new file mode 100644 index 00000000..175ed90f --- /dev/null +++ b/icechunk/tests/test_virtual_refs.rs @@ -0,0 +1,238 @@ +#[cfg(test)] +#[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used, clippy::expect_fun_call)] +mod tests { + use icechunk::{ + dataset::{get_chunk, ChunkPayload, ZarrArrayMetadata}, + format::{ + manifest::{VirtualChunkLocation, VirtualChunkRef}, + ByteRange, ChunkIndices, + }, + metadata::{ChunkKeyEncoding, ChunkShape, DataType, FillValue}, + storage::{object_store::S3Credentials, ObjectStorage}, + zarr::{AccessMode, ObjectId}, + Dataset, Storage, Store, + }; + use std::sync::Arc; + use std::{error::Error, num::NonZeroU64, path::PathBuf}; + + use bytes::Bytes; + use object_store::{ObjectStore, PutMode, PutOptions, PutPayload}; + use pretty_assertions::assert_eq; + + async fn create_minio_dataset() -> Dataset { + let storage: Arc = Arc::new( + ObjectStorage::new_s3_store( + "testbucket".to_string(), + format!("{:?}", ObjectId::random()), + Some(S3Credentials { + access_key_id: "minio123".into(), + secret_access_key: "minio123".into(), + session_token: None, + }), + Some("http://localhost:9000"), + ) + .expect("Creating minio storage failed"), + ); + Dataset::init(Arc::clone(&storage), true) + .await + .expect("building dataset failed") + .build() + } + + async fn write_chunks_to_minio(chunks: impl Iterator) { + use object_store::aws::AmazonS3Builder; + let bucket_name = "testbucket".to_string(); + // TODO: Switch to PutMode::Create when object_store supports that + let opts = PutOptions { mode: PutMode::Overwrite, ..PutOptions::default() }; + + let store = AmazonS3Builder::new() + .with_access_key_id("minio123") + .with_secret_access_key("minio123") + .with_endpoint("http://localhost:9000") + .with_allow_http(true) + .with_bucket_name(bucket_name) + .build() + .expect("building S3 store failed"); + + for (path, bytes) in chunks { + store + .put_opts( + &path.clone().into(), + PutPayload::from_bytes(bytes.clone()), + opts.clone(), + ) + .await + .expect(&format!("putting chunk to {} failed", &path)); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_dataset_with_virtual_refs() -> Result<(), Box> { + let bytes1 = Bytes::copy_from_slice(b"first"); + let bytes2 = Bytes::copy_from_slice(b"second0000"); + let chunks = [ + ("/path/to/chunk-1".into(), bytes1.clone()), + ("/path/to/chunk-2".into(), bytes2.clone()), + ]; + write_chunks_to_minio(chunks.iter().cloned()).await; + + let mut ds = create_minio_dataset().await; + + let zarr_meta = ZarrArrayMetadata { + shape: vec![1, 1, 2], + data_type: DataType::Int32, + chunk_shape: ChunkShape(vec![NonZeroU64::new(2).unwrap()]), + chunk_key_encoding: ChunkKeyEncoding::Slash, + fill_value: FillValue::Int32(0), + codecs: vec![], + storage_transformers: None, + dimension_names: None, + }; + let payload1 = ChunkPayload::Virtual(VirtualChunkRef { + location: VirtualChunkLocation::from_absolute_path(&format!( + // intentional extra '/' + "s3://testbucket///{}", + chunks[0].0 + ))?, + offset: 0, + length: 5, + }); + let payload2 = ChunkPayload::Virtual(VirtualChunkRef { + location: VirtualChunkLocation::from_absolute_path(&format!( + "s3://testbucket/{}", + chunks[1].0, + ))?, + offset: 1, + length: 5, + }); + + let new_array_path: PathBuf = "/array".to_string().into(); + ds.add_array(new_array_path.clone(), zarr_meta.clone()).await.unwrap(); + + ds.set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![0, 0, 0]), + Some(payload1), + ) + .await + .unwrap(); + ds.set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![0, 0, 1]), + Some(payload2), + ) + .await + .unwrap(); + + assert_eq!( + get_chunk( + ds.get_chunk_reader( + &new_array_path, + &ChunkIndices(vec![0, 0, 0]), + &ByteRange::ALL + ) + .await + .unwrap() + ) + .await + .unwrap(), + Some(bytes1.clone()), + ); + assert_eq!( + get_chunk( + ds.get_chunk_reader( + &new_array_path, + &ChunkIndices(vec![0, 0, 1]), + &ByteRange::ALL + ) + .await + .unwrap() + ) + .await + .unwrap(), + Some(Bytes::copy_from_slice(&bytes2[1..6])), + ); + + for range in [ + ByteRange::bounded(0u64, 3u64), + ByteRange::from_offset(2u64), + ByteRange::to_offset(4u64), + ] { + assert_eq!( + get_chunk( + ds.get_chunk_reader( + &new_array_path, + &ChunkIndices(vec![0, 0, 0]), + &range + ) + .await + .unwrap() + ) + .await + .unwrap(), + Some(range.slice(bytes1.clone())) + ); + } + Ok(()) + } + + #[tokio::test] + async fn test_zarr_store_virtual_refs_set_and_get( + ) -> Result<(), Box> { + let bytes1 = Bytes::copy_from_slice(b"first"); + let bytes2 = Bytes::copy_from_slice(b"second0000"); + let chunks = [ + ("/path/to/chunk-1".into(), bytes1.clone()), + ("/path/to/chunk-2".into(), bytes2.clone()), + ]; + write_chunks_to_minio(chunks.iter().cloned()).await; + + let ds = create_minio_dataset().await; + let mut store = Store::from_dataset( + ds, + AccessMode::ReadWrite, + Some("main".to_string()), + None, + ); + + store + .set( + "zarr.json", + Bytes::copy_from_slice(br#"{"zarr_format":3, "node_type":"group"}"#), + ) + .await?; + let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#); + store.set("array/zarr.json", zarr_meta.clone()).await?; + assert_eq!( + store.get("array/zarr.json", &ByteRange::ALL).await.unwrap(), + zarr_meta + ); + + let ref1 = VirtualChunkRef { + location: VirtualChunkLocation::from_absolute_path(&format!( + // intentional extra '/' + "s3://testbucket///{}", + chunks[0].0 + ))?, + offset: 0, + length: 5, + }; + let ref2 = VirtualChunkRef { + location: VirtualChunkLocation::from_absolute_path(&format!( + "s3://testbucket/{}", + chunks[1].0 + ))?, + offset: 1, + length: 5, + }; + store.set_virtual_ref("array/c/0/0/0", ref1).await?; + store.set_virtual_ref("array/c/0/0/1", ref2).await?; + + assert_eq!(store.get("array/c/0/0/0", &ByteRange::ALL).await?, bytes1,); + assert_eq!( + store.get("array/c/0/0/1", &ByteRange::ALL).await?, + Bytes::copy_from_slice(&bytes2[1..6]), + ); + Ok(()) + } +}