Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add virtual ref support EAR-1183 #85

Merged
merged 41 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9c00af0
[WIP] Add virtual ref support
dcherian Sep 20, 2024
72c1096
Better test
dcherian Sep 20, 2024
c8ec4cb
wip
dcherian Sep 20, 2024
bcfe671
Review comments
dcherian Sep 23, 2024
ed18dd9
Arc instead of Box
dcherian Sep 23, 2024
f1e05c7
Merge branch 'main' into virtual-refs
dcherian Sep 23, 2024
8ae0427
Move trait to own file
dcherian Sep 23, 2024
4f52f1d
Set-time checking
dcherian Sep 23, 2024
6133859
lint
dcherian Sep 23, 2024
7beb82c
Merge branch 'main' into virtual-refs
dcherian Sep 23, 2024
2dccd6c
Cleanup
dcherian Sep 23, 2024
7071039
Better locking
dcherian Sep 24, 2024
a6403d1
Move more code to virtual_ref.rs
dcherian Sep 24, 2024
f35cfd4
Some more cleanup
dcherian Sep 24, 2024
d21231e
permute(Box, Send, Error, dyn, Sync)
dcherian Sep 24, 2024
445cb44
lint
dcherian Sep 24, 2024
427796c
Better name
dcherian Sep 24, 2024
9ba6607
Use VirtualChunkResolver trait
dcherian Sep 24, 2024
ac9e5b5
Merge branch 'main' into virtual-refs
dcherian Sep 24, 2024
5954991
Use unwrap to debug
dcherian Sep 24, 2024
14533e8
Specify Debug on trait
dcherian Sep 24, 2024
e9825ef
tighter types
dcherian Sep 24, 2024
0ca2f9e
lint
dcherian Sep 24, 2024
ee73ca5
`Store.set_virtual_ref`
dcherian Sep 24, 2024
f8d8ae2
Move virtual refs tests to tests/
dcherian Sep 24, 2024
89f9aee
print debugging
dcherian Sep 24, 2024
835f5af
set all env vars
dcherian Sep 24, 2024
3bdafe1
remove dbg!
dcherian Sep 24, 2024
0ef8c8f
Cleanup
dcherian Sep 25, 2024
60a14ef
Update set_virtual_ref signature
dcherian Sep 25, 2024
c7ec230
Better byte_range handling
dcherian Sep 25, 2024
4bd3176
Merge remote-tracking branch 'origin/main' into virtual-refs
dcherian Sep 25, 2024
fa8fde5
Beter test
dcherian Sep 25, 2024
208d1ce
cleanup meatdata error message
dcherian Sep 25, 2024
d6241f4
Better Error
dcherian Sep 25, 2024
2f7c479
lint
dcherian Sep 25, 2024
a0bb597
update justfile with envvars
dcherian Sep 25, 2024
3f625a0
fix parallelism
dcherian Sep 26, 2024
e402d42
cleanup
dcherian Sep 26, 2024
0785687
Update README
dcherian Sep 26, 2024
66d361d
Merge branch 'main' into virtual-refs
dcherian Sep 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/rust-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ jobs:
env:
AWS_ACCESS_KEY_ID: minio123
AWS_SECRET_ACCESS_KEY: minio123
AWS_ALLOW_HTTP: 1
AWS_ENDPOINT_URL: http://localhost:9000
AWS_DEFAULT_REGION: "us-east-1"
run: |
just pre-commit
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ volumes:

services:
minio:
container_name: minio
container_name: icechunk_minio
image: quay.io/minio/minio
entrypoint: |
/bin/sh -c '
Expand Down
1 change: 1 addition & 0 deletions icechunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ base32 = "0.5.1"
chrono = { version = "0.4.38", features = ["serde"] }
async-recursion = "1.1.1"
rmp-serde = "1.3.0"
url = "2.5.2"

[dev-dependencies]
pretty_assertions = "1.4.1"
Expand Down
31 changes: 25 additions & 6 deletions icechunk/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ use std::{
sync::Arc,
};

use crate::storage::virtual_ref::VirtualChunkResolver;
pub use crate::{
format::{manifest::ChunkPayload, snapshot::ZarrArrayMetadata, ChunkIndices, Path},
format::{
manifest::{ChunkPayload, VirtualChunkLocation},
snapshot::ZarrArrayMetadata,
ChunkIndices, Path,
},
metadata::{
ArrayShape, ChunkKeyEncoding, ChunkShape, Codec, DataType, DimensionName,
DimensionNames, FillValue, StorageTransformer, UserAttributes,
},
};

use bytes::Bytes;
use chrono::Utc;
use futures::{future::ready, Stream, StreamExt, TryStreamExt};
Expand All @@ -23,7 +27,9 @@ use tokio::task;

use crate::{
format::{
manifest::{ChunkInfo, ChunkRef, Manifest, ManifestExtents, ManifestRef},
manifest::{
ChunkInfo, ChunkRef, Manifest, ManifestExtents, ManifestRef, VirtualChunkRef,
},
snapshot::{
NodeData, NodeSnapshot, NodeType, Snapshot, SnapshotMetadata,
SnapshotProperties, UserAttributesSnapshot,
Expand All @@ -34,6 +40,7 @@ use crate::{
create_tag, fetch_branch_tip, fetch_tag, update_branch, BranchVersion, Ref,
RefError,
},
storage::virtual_ref::ObjectStoreVirtualChunkResolver,
MemCachingStorage, Storage, StorageError,
};

Expand All @@ -54,13 +61,14 @@ impl Default for DatasetConfig {
}
}

#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct Dataset {
config: DatasetConfig,
storage: Arc<dyn Storage + Send + Sync>,
snapshot_id: ObjectId,
last_node_id: Option<NodeId>,
change_set: ChangeSet,
virtual_resolver: Arc<dyn VirtualChunkResolver + Send + Sync>,
}

#[derive(Clone, Debug, PartialEq, Default)]
Expand Down Expand Up @@ -342,6 +350,7 @@ impl Dataset {
storage,
last_node_id: None,
change_set: ChangeSet::default(),
virtual_resolver: Arc::new(ObjectStoreVirtualChunkResolver::default()),
}
}

Expand Down Expand Up @@ -645,8 +654,18 @@ impl Dataset {
Ok(self.storage.fetch_chunk(&id, byte_range).await.map(Some)?)
}
Some(ChunkPayload::Inline(bytes)) => Ok(Some(byte_range.slice(bytes))),
//FIXME: implement virtual fetch
Some(ChunkPayload::Virtual(_)) => todo!(),
Some(ChunkPayload::Virtual(VirtualChunkRef { location, offset, length })) => {
Ok(self
.virtual_resolver
.fetch_chunk(
&location,
&byte_range.replace_unbounded_with(
&ByteRange::from_offset_to_length(offset, length),
),
)
.await
.map(Some)?)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this good at the type level but wrong semantically (which impacts on performance). The idea with this get_chunk_reader design is that we want to hold the reference to Self for as little as possible. We don't want to hold it all the time while the bytes are downloaded from S3. If we did that, nobody can write a new reference in the dataset, until we are not done downloading, which is a completely independent thing.

So, the way it works is, this function does the minimum it needs to do while holding the ref, and then returns a future that will do the actual work of downloading bytes when awaited. If you look above at the materialized case, we only resolve the chunk_ref and clone a pointer, that's enough to set up the Future we return.

You should do something similar: clone your virtual resolver and "move" it into a Feature that you return without awaiting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I was being dumb, sorry. Thanks for taking the time to write it out.

None => Ok(None),
}
}
Expand Down
63 changes: 57 additions & 6 deletions icechunk/src/format/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use itertools::Itertools;
use std::{collections::BTreeMap, ops::Bound, sync::Arc};
use thiserror::Error;

use bytes::Bytes;
use serde::{Deserialize, Serialize};

use super::{ChunkIndices, Flags, IcechunkFormatError, IcechunkResult, NodeId, ObjectId};
use super::{
ChunkIndices, ChunkLength, ChunkOffset, Flags, IcechunkFormatError, IcechunkResult,
NodeId, ObjectId,
};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestExtents(pub Vec<ChunkIndices>);
Expand All @@ -15,18 +20,64 @@ pub struct ManifestRef {
pub extents: ManifestExtents,
}

#[derive(Debug, Error)]
pub enum VirtualReferenceError {
#[error("error parsing virtual ref URL {0}")]
CannotParseUrl(#[from] url::ParseError),
#[error("virtual reference has no path segments {0}")]
NoPathSegments(String),
#[error("unsupported scheme for virtual chunk refs: {0}")]
UnsupportedScheme(String),
#[error("error parsing bucket name from virtual ref URL {0}")]
CannotParseBucketName(String),
#[error("error parsing virtual reference {0}")]
OtherError(#[from] Box<dyn std::error::Error + Send + Sync>),
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum VirtualChunkLocation {
Absolute(String),
// Relative(prefix_id, String)
}

impl VirtualChunkLocation {
pub fn from_absolute_path(
path: &str,
) -> Result<VirtualChunkLocation, VirtualReferenceError> {
// make sure we can parse the provided URL before creating the enum
// TODO: consider other validation here.
let url = url::Url::parse(path)?;
let new_path: String = url
.path_segments()
.ok_or(VirtualReferenceError::NoPathSegments(path.into()))?
// strip empty segments here, object_store cannot handle them.
.filter(|x| !x.is_empty())
.join("/");

let host = url
.host()
.ok_or_else(|| VirtualReferenceError::CannotParseBucketName(path.into()))?;
Ok(VirtualChunkLocation::Absolute(format!(
"{}://{}/{}",
url.scheme(),
host,
new_path,
)))
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct VirtualChunkRef {
location: String, // FIXME: better type
offset: u64,
length: u64,
pub location: VirtualChunkLocation,
pub offset: ChunkOffset,
pub length: ChunkLength,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ChunkRef {
pub id: ObjectId,
pub offset: u64,
pub length: u64,
pub offset: ChunkOffset,
pub length: ChunkLength,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
Expand Down
15 changes: 15 additions & 0 deletions icechunk/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ impl ByteRange {
Self(Bound::Included(offset), Bound::Unbounded)
}

pub fn from_offset_to_length(offset: ChunkOffset, length: ChunkOffset) -> Self {
Self(Bound::Included(offset), Bound::Excluded(offset + length))
}

pub fn to_offset(offset: ChunkOffset) -> Self {
Self(Bound::Unbounded, Bound::Excluded(offset))
}
Expand All @@ -115,6 +119,17 @@ impl ByteRange {

pub const ALL: Self = Self(Bound::Unbounded, Bound::Unbounded);

pub fn replace_unbounded_with(&self, other: &ByteRange) -> Self {
Copy link
Contributor Author

@dcherian dcherian Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name should be clearer now

let first = match self.0 {
Bound::Unbounded => other.0,
_ => self.0,
};
let second = match self.1 {
Bound::Unbounded => other.1,
_ => self.1,
};
ByteRange(first, second)
}
pub fn slice(&self, bytes: Bytes) -> Bytes {
match (self.0, self.1) {
(Bound::Included(start), Bound::Excluded(end)) => {
Expand Down
9 changes: 7 additions & 2 deletions icechunk/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ pub mod caching;
pub mod logging;

pub mod object_store;
pub mod virtual_ref;

pub use caching::MemCachingStorage;
pub use object_store::ObjectStorage;

use crate::format::{
attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, ByteRange,
ObjectId, Path,
attributes::AttributesTable,
manifest::{Manifest, VirtualReferenceError},
snapshot::Snapshot,
ByteRange, ObjectId, Path,
};

#[derive(Debug, Error)]
Expand All @@ -37,6 +40,8 @@ pub enum StorageError {
RefAlreadyExists(String),
#[error("ref not found: {0}")]
RefNotFound(String),
#[error("error parsing virtual reference {0}")]
VirtualReferenceError(#[from] VirtualReferenceError),
#[error("generic storage error: {0}")]
OtherError(#[from] Arc<dyn std::error::Error + Sync + Send>),
#[error("unknown storage error: {0}")]
Expand Down
14 changes: 6 additions & 8 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use core::fmt;
use std::{
fs::create_dir_all, future::ready, ops::Bound, path::Path as StdPath, sync::Arc,
use crate::format::{
attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, ByteRange,
ObjectId,
};

use async_trait::async_trait;
use bytes::Bytes;
use core::fmt;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use object_store::{
local::LocalFileSystem, memory::InMemory, path::Path as ObjectPath, GetOptions,
GetRange, ObjectStore, PutMode, PutOptions, PutPayload,
};

use crate::format::{
attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, ByteRange,
ObjectId,
use std::{
fs::create_dir_all, future::ready, ops::Bound, path::Path as StdPath, sync::Arc,
};

use super::{Storage, StorageError, StorageResult};
Expand Down
82 changes: 82 additions & 0 deletions icechunk/src/storage/virtual_ref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::format::manifest::{VirtualChunkLocation, VirtualReferenceError};
use crate::format::ByteRange;
use crate::storage::StorageResult;
use async_trait::async_trait;
use bytes::Bytes;
use object_store::{
aws::AmazonS3Builder, path::Path as ObjectPath, GetOptions, GetRange, ObjectStore,
};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::RwLock;
use url;

#[async_trait]
pub trait VirtualChunkResolver: Debug {
async fn fetch_chunk(
&self,
location: &VirtualChunkLocation,
range: &ByteRange,
) -> StorageResult<Bytes>;
}

#[derive(PartialEq, Eq, Hash, Clone, Debug, Default)]
struct StoreCacheKey(String, String);

#[derive(Debug, Default)]
pub struct ObjectStoreVirtualChunkResolver {
stores: Arc<RwLock<HashMap<StoreCacheKey, Arc<dyn ObjectStore>>>>,
}

#[async_trait]
impl VirtualChunkResolver for ObjectStoreVirtualChunkResolver {
async fn fetch_chunk(
&self,
location: &VirtualChunkLocation,
range: &ByteRange,
) -> StorageResult<Bytes> {
let VirtualChunkLocation::Absolute(location) = location;
let parsed =
url::Url::parse(location).map_err(VirtualReferenceError::CannotParseUrl)?;
let bucket_name = parsed
.host_str()
.ok_or(VirtualReferenceError::CannotParseBucketName(
"error parsing bucket name".into(),
))?
.to_string();
let path = ObjectPath::parse(parsed.path())
.map_err(|e| VirtualReferenceError::OtherError(Box::new(e)))?;
let scheme = parsed.scheme();
let cache_key = StoreCacheKey(scheme.into(), bucket_name);

let options =
GetOptions { range: Option::<GetRange>::from(range), ..Default::default() };
let store = {
let stores = self.stores.read().await;
#[allow(clippy::expect_used)]
stores.get(&cache_key).map(Arc::clone)
};
let store = match store {
Some(store) => store,
None => {
let builder = match scheme {
"s3" => AmazonS3Builder::from_env(),
_ => {
Err(VirtualReferenceError::UnsupportedScheme(scheme.to_string()))?
}
};
let new_store: Arc<dyn ObjectStore> =
Arc::new(builder.with_bucket_name(&cache_key.1).build()?);
{
self.stores
.write()
.await
.insert(cache_key.clone(), Arc::clone(&new_store));
}
new_store
}
};
Ok(store.get_opts(&path, options).await?.bytes().await?)
}
}
14 changes: 9 additions & 5 deletions icechunk/src/strategies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@ pub fn node_paths() -> impl Strategy<Value = Path> {
any::<PathBuf>()
}

#[allow(clippy::expect_used)]
pub fn empty_datasets() -> impl Strategy<Value = Dataset> {
prop_compose! {
#[allow(clippy::expect_used)]
pub fn empty_datasets()(_id in any::<u32>()) -> Dataset {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there's a better solution with Arc but lets do that later if we need to.

// _id is used as a hack to avoid using prop_oneof![Just(dataset)]
// Using Just requires Dataset impl Clone, which we do not want

// FIXME: add storages strategy
let storage = ObjectStorage::new_in_memory_store(None);
let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");

let dataset = runtime.block_on(async {
runtime.block_on(async {
Dataset::init(Arc::new(storage), false)
.await
.expect("Failed to initialize dataset")
.build()
});
prop_oneof![Just(dataset)]
})
}
}

pub fn codecs() -> impl Strategy<Value = Vec<Codec>> {
Expand Down
Loading
Loading