diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index ba3e26cbf9..0d5c77c8e2 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -20,7 +20,7 @@ features = ["azure", "datafusion", "gcs", "hdfs", "json", "mount", "python", "s3 deltalake-core = { version = "0.17.1", path = "../core" } deltalake-aws = { version = "0.1.0", path = "../aws", default-features = false, optional = true } deltalake-azure = { version = "0.1.0", path = "../azure", optional = true } -deltalake-gcp = { version = "0.1.0", path = "../gcp", optional = true } +deltalake-gcp = { version = "0.2", path = "../gcp", optional = true } deltalake-catalog-glue = { version = "0.1.0", path = "../catalog-glue", optional = true } deltalake-mount = { version = "0.1.0", path = "../mount", optional = true } diff --git a/crates/gcp/Cargo.toml b/crates/gcp/Cargo.toml index daa9042c83..bf94ee29be 100644 --- a/crates/gcp/Cargo.toml +++ b/crates/gcp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-gcp" -version = "0.1.0" +version = "0.2.0" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/gcp/src/lib.rs b/crates/gcp/src/lib.rs index 6fe040d398..ada4838ac1 100644 --- a/crates/gcp/src/lib.rs +++ b/crates/gcp/src/lib.rs @@ -13,6 +13,7 @@ use url::Url; mod config; pub mod error; +mod storage; trait GcpOptions { fn as_gcp_options(&self) -> HashMap; @@ -43,6 +44,7 @@ impl ObjectStoreFactory for GcpFactory { ) -> DeltaResult<(ObjectStoreRef, Path)> { let config = config::GcpConfigHelper::try_new(options.as_gcp_options())?.build()?; let (store, prefix) = parse_url_opts(url, config)?; + let store = crate::storage::GcsStorageBackend::try_new(Arc::new(store))?; Ok((url_prefix_handler(store, prefix.clone())?, prefix)) } } diff --git a/crates/gcp/src/storage.rs b/crates/gcp/src/storage.rs new file mode 100644 index 0000000000..9b938b737e --- /dev/null +++ b/crates/gcp/src/storage.rs @@ -0,0 +1,137 @@ +//! GCP GCS storage backend. + +use bytes::Bytes; +use deltalake_core::storage::ObjectStoreRef; +use deltalake_core::Path; +use futures::stream::BoxStream; +use std::ops::Range; +use tokio::io::AsyncWrite; + +use deltalake_core::storage::object_store::{ + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result as ObjectStoreResult, +}; + +pub(crate) struct GcsStorageBackend { + inner: ObjectStoreRef, +} + +impl GcsStorageBackend { + pub fn try_new(storage: ObjectStoreRef) -> ObjectStoreResult { + Ok(Self { inner: storage }) + } +} + +impl std::fmt::Debug for GcsStorageBackend { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "GcsStorageBackend") + } +} + +impl std::fmt::Display for GcsStorageBackend { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "GcsStorageBackend") + } +} + +#[async_trait::async_trait] +impl ObjectStore for GcsStorageBackend { + async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult { + self.inner.put(location, bytes).await + } + + async fn put_opts( + &self, + location: &Path, + bytes: Bytes, + options: PutOptions, + ) -> ObjectStoreResult { + self.inner.put_opts(location, bytes, options).await + } + + async fn get(&self, location: &Path) -> ObjectStoreResult { + self.inner.get(location).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + self.inner.get_opts(location, options).await + } + + async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { + self.inner.get_range(location, range).await + } + + async fn head(&self, location: &Path) -> ObjectStoreResult { + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { + self.inner.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + let res = self.inner.rename_if_not_exists(from, to).await; + match res { + Ok(_) => Ok(()), + Err(e) => { + match e { + object_store::Error::Generic { store, source } => { + // If this is a 429 (rate limit) error it means more than 1 mutation operation per second + // Was attempted on this same key + // That means we're experiencing concurrency conflicts, so return a transaction error + // Source would be a reqwest error which we don't have access to so the easiest thing to do is check + // for "429" in the error message + if format!("{:?}", source).contains("429") { + Err(object_store::Error::AlreadyExists { + path: to.to_string(), + source, + }) + } else { + Err(object_store::Error::Generic { store, source }) + } + } + _ => Err(e), + } + } + } + } + + async fn put_multipart( + &self, + location: &Path, + ) -> ObjectStoreResult<(MultipartId, Box)> { + self.inner.put_multipart(location).await + } + + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> ObjectStoreResult<()> { + self.inner.abort_multipart(location, multipart_id).await + } +}