Skip to content

Commit

Permalink
Set-time checking
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherian committed Sep 23, 2024
1 parent 8ae0427 commit 4f52f1d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 23 deletions.
33 changes: 25 additions & 8 deletions icechunk/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use std::{

use crate::storage::virtual_ref::VirtualChunkResolver;
pub use crate::{
format::{manifest::ChunkPayload, snapshot::ZarrArrayMetadata, ChunkIndices, Path},
format::{
manifest::{ChunkPayload, VirtualChunkLocation},
snapshot::ZarrArrayMetadata,
ChunkIndices, Path,
},
metadata::{
ArrayShape, ChunkKeyEncoding, ChunkShape, Codec, DataType, DimensionName,
DimensionNames, FillValue, StorageTransformer, UserAttributes,
Expand Down Expand Up @@ -150,7 +154,6 @@ impl ChangeSet {
coord: ChunkIndices,
data: Option<ChunkPayload>,
) {
// FIXME: normalize virtual chunk reference paths here
// this implementation makes delete idempotent
// it allows deleting a deleted chunk by repeatedly setting None.
self.set_chunks
Expand Down Expand Up @@ -1146,7 +1149,9 @@ mod tests {
use std::{error::Error, num::NonZeroU64, path::PathBuf};

use crate::{
format::manifest::{ChunkInfo, VirtualChunkLocation, VirtualChunkRef},
format::manifest::{
ChunkInfo, VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError,
},
metadata::{
ChunkKeyEncoding, ChunkShape, Codec, DataType, FillValue, StorageTransformer,
},
Expand Down Expand Up @@ -1326,22 +1331,34 @@ mod tests {
dimension_names: None,
};
let payload1 = ChunkPayload::Virtual(VirtualChunkRef {
location: VirtualChunkLocation::Absolute(format!(
"s3://testbucket/{}",
location: VirtualChunkLocation::from_absolute_path(&format!(
// intentional extra '/'
"s3://testbucket///{}",
path1
)),
))?,
offset: 0,
length: 5,
});
let payload2 = ChunkPayload::Virtual(VirtualChunkRef {
location: VirtualChunkLocation::Absolute(format!(
location: VirtualChunkLocation::from_absolute_path(&format!(
"s3://testbucket/{}",
path2
)),
))?,
offset: 1,
length: 5,
});

// bad relative chunk location
assert!(matches!(
VirtualChunkLocation::from_absolute_path("abcdef"),
Err(VirtualReferenceError::CannotParseUrl(_)),
));
// extra / prevents bucket name detection
assert!(matches!(
VirtualChunkLocation::from_absolute_path("s3:///foo/path"),
Err(VirtualReferenceError::CannotParseBucketName(_)),
));

let new_array_path: PathBuf = "/array".to_string().into();
ds.add_array(new_array_path.clone(), zarr_meta.clone()).await?;

Expand Down
43 changes: 43 additions & 0 deletions icechunk/src/format/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use itertools::Itertools;
use std::{collections::BTreeMap, ops::Bound, sync::Arc};
use thiserror::Error;

use bytes::Bytes;
use serde::{Deserialize, Serialize};
Expand All @@ -15,12 +17,53 @@ pub struct ManifestRef {
pub extents: ManifestExtents,
}

#[derive(Debug, Error)]
pub enum VirtualReferenceError {
#[error("error parsing virtual ref URL {0}")]
CannotParseUrl(#[from] url::ParseError),
#[error("virtual reference has no path segments {0}")]
NoPathSegments(String),
#[error("unsupported scheme for virtual chunk refs: {0}")]
UnsupportedScheme(String),
#[error("error parsing bucket name from virtual ref URL {0}")]
CannotParseBucketName(String),
#[error("error parsing path using object_store {0}")]
CannotParsePath(#[from] ::object_store::path::Error),
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum VirtualChunkLocation {
Absolute(String),
// Relative(prefix_id, String)
}

impl VirtualChunkLocation {
pub fn from_absolute_path(
path: &str,
) -> Result<VirtualChunkLocation, VirtualReferenceError> {
// make sure we can parse the provided URL before creating the enum
// TODO: consider other validation here.
let url = url::Url::parse(path)?;
let new_path: String = url
.path_segments()
.ok_or(VirtualReferenceError::NoPathSegments(path.into()))?
// strip empty segments here, object_store cannot handle them.
.filter(|x| !x.is_empty())
.join("/");

if !url.has_host() {
Err(VirtualReferenceError::CannotParseBucketName(path.into()))
} else {
#[allow(clippy::expect_used)]
Ok(VirtualChunkLocation::Absolute(format!(
"{}://{}/{}",
url.scheme(),
url.host().expect("cannot be absent."),
new_path,
)))
}
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct VirtualChunkRef {
pub location: VirtualChunkLocation,
Expand Down
16 changes: 6 additions & 10 deletions icechunk/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ pub use caching::MemCachingStorage;
pub use object_store::ObjectStorage;

use crate::format::{
attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, ByteRange,
ObjectId, Path,
attributes::AttributesTable,
manifest::{Manifest, VirtualReferenceError},
snapshot::Snapshot,
ByteRange, ObjectId, Path,
};

#[derive(Debug, Error)]
Expand All @@ -38,14 +40,8 @@ pub enum StorageError {
RefAlreadyExists(String),
#[error("ref not found: {0}")]
RefNotFound(String),
#[error("error parsing virtual ref URL {0}")]
VirtualUrlError(#[from] url::ParseError),
#[error("error parsing bucket name from virtual ref URL {0}")]
VirtualBucketParseError(String),
#[error("error parsing path using object_store {0}")]
VirtualPathParseError(#[from] ::object_store::path::Error),
#[error("unsupported scheme for virtual chunk refs: {0}")]
UnsupportedScheme(String),
#[error("error parsing virtual reference {0}")]
VirtualReferenceError(#[from] VirtualReferenceError),
#[error("generic storage error: {0}")]
OtherError(#[from] Arc<dyn std::error::Error + Sync + Send>),
#[error("unknown storage error: {0}")]
Expand Down
14 changes: 9 additions & 5 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use crate::format::{
attributes::AttributesTable,
manifest::{Manifest, VirtualChunkLocation},
manifest::{Manifest, VirtualChunkLocation, VirtualReferenceError},
snapshot::Snapshot,
ByteRange, ObjectId,
};
Expand Down Expand Up @@ -381,14 +381,16 @@ impl VirtualChunkResolver for ObjectStoreVirtualChunkResolver {
range: &ByteRange,
) -> StorageResult<Bytes> {
let VirtualChunkLocation::Absolute(location) = location;
let parsed = url::Url::parse(location)?;
let parsed = url::Url::parse(location)
.map_err(VirtualReferenceError::CannotParseUrl)?;
let bucket_name = parsed
.host_str()
.ok_or(StorageError::VirtualBucketParseError(
.ok_or(VirtualReferenceError::CannotParseBucketName(
"error parsing bucket name".into(),
))?
.to_string();
let path = ObjectPath::parse(parsed.path())?;
let path = ObjectPath::parse(parsed.path())
.map_err(VirtualReferenceError::CannotParsePath)?;
let scheme = parsed.scheme();
let cache_key = StoreCacheKey(scheme.into(), bucket_name);

Expand All @@ -400,7 +402,9 @@ impl VirtualChunkResolver for ObjectStoreVirtualChunkResolver {
false => {
let builder = match scheme {
"s3" => AmazonS3Builder::from_env(),
_ => Err(StorageError::UnsupportedScheme(scheme.to_string()))?,
_ => {
Err(VirtualReferenceError::UnsupportedScheme(scheme.to_string()))?
}
};
let new_store = Arc::new(builder.with_bucket_name(&cache_key.1).build()?);
{
Expand Down
13 changes: 13 additions & 0 deletions icechunk/src/storage/virtual_ref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use crate::format::{manifest::VirtualChunkLocation, ByteRange};
use crate::storage::StorageResult;
use async_trait::async_trait;
use bytes::Bytes;

#[async_trait]
pub trait VirtualChunkResolver {
async fn fetch_chunk(
&self,
location: &VirtualChunkLocation,
range: &ByteRange,
) -> StorageResult<Bytes>;
}

0 comments on commit 4f52f1d

Please sign in to comment.