From 5901f90cb4f45c311b0b05eb9326060edfd53381 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 19 Aug 2024 12:07:24 +0200 Subject: [PATCH 1/3] feat: allow custom IO runtime --- crates/aws/src/storage.rs | 41 +-- crates/core/src/kernel/snapshot/mod.rs | 2 +- crates/core/src/lib.rs | 4 +- crates/core/src/logstore/mod.rs | 29 +- .../core/src/operations/convert_to_delta.rs | 3 +- crates/core/src/operations/writer.rs | 1 + crates/core/src/storage/mod.rs | 300 +++++++++++++++++- crates/core/src/table/builder.rs | 130 ++++---- crates/core/src/table/mod.rs | 9 +- crates/core/tests/integration_datafusion.rs | 2 +- python/src/lib.rs | 5 +- 11 files changed, 414 insertions(+), 112 deletions(-) diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 4625bb6be9..d5609d321d 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -78,26 +78,31 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { Some((s3_key, value.clone())) }), )?; + let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?; - let store = limit_store_handler(inner, &options); - - // If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename. - if options - .0 - .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) - { - Ok((store, prefix)) - } else { - let s3_options = S3StorageOptions::from_map(&storage_options.0)?; - - let store = S3StorageBackend::try_new( - store, - Some("dynamodb") == s3_options.locking_provider.as_deref() - || s3_options.allow_unsafe_rename, - )?; + Ok((store, prefix)) + } +} - Ok((Arc::new(store), prefix)) - } +fn aws_storage_handler( + store: ObjectStoreRef, + options: &StorageOptions, +) -> DeltaResult { + // If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename. + if options + .0 + .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) + { + Ok(store) + } else { + let s3_options = S3StorageOptions::from_map(&options.0)?; + + let store = S3StorageBackend::try_new( + store, + Some("dynamodb") == s3_options.locking_provider.as_deref() + || s3_options.allow_unsafe_rename, + )?; + Ok(Arc::new(store)) } } diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index a131b33067..8d5101f2df 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -657,7 +657,7 @@ impl EagerSnapshot { } } -fn stats_schema<'a>(schema: &StructType, config: TableConfig<'a>) -> DeltaResult { +fn stats_schema(schema: &StructType, config: TableConfig<'_>) -> DeltaResult { let stats_fields = if let Some(stats_cols) = config.stats_columns() { stats_cols .iter() diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 93e3ff29ed..86a381de70 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -92,9 +92,7 @@ pub use self::data_catalog::{DataCatalog, DataCatalogError}; pub use self::errors::*; pub use self::schema::partitions::*; pub use self::schema::*; -pub use self::table::builder::{ - DeltaTableBuilder, DeltaTableConfig, DeltaTableLoadOptions, DeltaVersion, -}; +pub use self::table::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion}; pub use self::table::config::DeltaConfigKey; pub use self::table::DeltaTable; pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore}; diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index c808c95176..137faa83f4 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -18,9 +18,12 @@ use url::Url; use crate::kernel::Action; use crate::operations::transaction::TransactionError; use crate::protocol::{get_last_checkpoint, ProtocolError}; +use crate::storage::DeltaIOStorageBackend; use crate::storage::{ - commit_uri_from_version, retry_ext::ObjectStoreRetryExt, ObjectStoreRef, StorageOptions, + commit_uri_from_version, retry_ext::ObjectStoreRetryExt, IORuntime, ObjectStoreRef, + StorageOptions, }; + use crate::{DeltaResult, DeltaTableError}; #[cfg(feature = "datafusion")] @@ -98,11 +101,12 @@ lazy_static! { /// # use std::collections::HashMap; /// # use url::Url; /// let location = Url::parse("memory:///").expect("Failed to make location"); -/// let logstore = logstore_for(location, HashMap::new()).expect("Failed to get a logstore"); +/// let logstore = logstore_for(location, HashMap::new(), None).expect("Failed to get a logstore"); /// ``` pub fn logstore_for( location: Url, options: impl Into + Clone, + io_runtime: Option, ) -> DeltaResult { // turn location into scheme let scheme = Url::parse(&format!("{}://", location.scheme())) @@ -110,10 +114,11 @@ pub fn logstore_for( if let Some(entry) = crate::storage::factories().get(&scheme) { debug!("Found a storage provider for {scheme} ({location})"); + let (store, _prefix) = entry .value() .parse_url_opts(&location, &options.clone().into())?; - return logstore_with(store, location, options); + return logstore_with(store, location, options, io_runtime); } Err(DeltaTableError::InvalidTableLocation(location.into())) } @@ -123,10 +128,17 @@ pub fn logstore_with( store: ObjectStoreRef, location: Url, options: impl Into + Clone, + io_runtime: Option, ) -> DeltaResult { let scheme = Url::parse(&format!("{}://", location.scheme())) .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?; + let store = if let Some(io_runtime) = io_runtime { + Arc::new(DeltaIOStorageBackend::new(store, io_runtime.get_rt())) as ObjectStoreRef + } else { + store + }; + if let Some(factory) = logstores().get(&scheme) { debug!("Found a logstore provider for {scheme}"); return factory.with_options(store, &location, &options.into()); @@ -471,14 +483,21 @@ mod tests { #[test] fn logstore_with_invalid_url() { let location = Url::parse("nonexistent://table").unwrap(); - let store = logstore_for(location, HashMap::default()); + let store = logstore_for(location, HashMap::default(), None); assert!(store.is_err()); } #[test] fn logstore_with_memory() { let location = Url::parse("memory://table").unwrap(); - let store = logstore_for(location, HashMap::default()); + let store = logstore_for(location, HashMap::default(), None); + assert!(store.is_ok()); + } + + #[test] + fn logstore_with_memory_and_rt() { + let location = Url::parse("memory://table").unwrap(); + let store = logstore_for(location, HashMap::default(), Some(IORuntime::default())); assert!(store.is_ok()); } } diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index 1a0931c4ac..ceb6cac3c4 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -239,6 +239,7 @@ impl ConvertToDeltaBuilder { crate::logstore::logstore_for( ensure_table_uri(location)?, self.storage_options.unwrap_or_default(), + None, // TODO: allow runtime to be passed into builder )? } else { return Err(Error::MissingLocation); @@ -477,7 +478,7 @@ mod tests { fn log_store(path: impl Into) -> LogStoreRef { let path: String = path.into(); let location = ensure_table_uri(path).expect("Failed to get the URI from the path"); - crate::logstore::logstore_for(location, StorageOptions::default()) + crate::logstore::logstore_for(location, StorageOptions::default(), None) .expect("Failed to create an object store") } diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index 8fd4273c9f..3c9d3bda97 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -369,6 +369,7 @@ impl PartitionWriter { // write file to object store self.object_store.put(&path, buffer.into()).await?; + self.files_written.push( create_add( &self.config.partition_values, diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index ceac5e1436..6d07b4e133 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -2,24 +2,32 @@ use std::collections::HashMap; use std::sync::{Arc, OnceLock}; +use crate::{DeltaResult, DeltaTableError}; use dashmap::DashMap; +use futures::future::BoxFuture; +use futures::FutureExt; +use futures::TryFutureExt; use lazy_static::lazy_static; use object_store::limit::LimitStore; use object_store::local::LocalFileSystem; use object_store::memory::InMemory; use object_store::prefix::PrefixStore; +use object_store::{GetOptions, PutOptions, PutPayload, PutResult}; use serde::{Deserialize, Serialize}; +use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; use url::Url; -use crate::{DeltaResult, DeltaTableError}; - +use bytes::Bytes; +use futures::stream::BoxStream; pub use object_store; pub use object_store::path::{Path, DELIMITER}; pub use object_store::{ DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result as ObjectStoreResult, }; +use object_store::{MultipartUpload, PutMultipartOpts}; pub use retry_ext::ObjectStoreRetryExt; +use std::ops::Range; pub use utils::*; pub mod file; @@ -30,6 +38,294 @@ lazy_static! { static ref DELTA_LOG_PATH: Path = Path::from("_delta_log"); } +/// Creates static IO Runtime with optional configuration +fn io_rt(config: Option<&RuntimeConfig>) -> &'static Runtime { + static IO_RT: OnceLock = OnceLock::new(); + IO_RT.get_or_init(|| { + let rt = match config { + Some(config) => { + let mut builder = if config.multi_threaded { + RuntimeBuilder::new_multi_thread() + } else { + RuntimeBuilder::new_current_thread() + }; + let builder = builder.worker_threads(config.worker_threads); + let builder = if config.enable_io && config.enable_time { + builder.enable_all() + } else if config.enable_io && !config.enable_time { + builder.enable_io() + } else if !config.enable_io && config.enable_time { + builder.enable_time() + } else { + builder + }; + builder + .thread_name( + config + .thread_name + .clone() + .unwrap_or("IO-runtime".to_string()), + ) + .build() + } + _ => Runtime::new(), + }; + rt.expect("Failed to create a tokio runtime for IO.") + }) +} + +/// Configuration for Tokio runtime +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RuntimeConfig { + multi_threaded: bool, + worker_threads: usize, + thread_name: Option, + enable_io: bool, + enable_time: bool, +} + +/// Provide custom Tokio RT or a runtime config +#[derive(Debug, Clone)] +pub enum IORuntime { + /// Tokio RT handle + RT(&'static Runtime), + /// Configuration for tokio runtime + Config(RuntimeConfig), +} + +impl Default for IORuntime { + fn default() -> Self { + IORuntime::RT(io_rt(None)) + } +} + +impl IORuntime { + /// Retrieves the Tokio runtime for IO bound operations + pub fn get_rt(&self) -> &'static Runtime { + match self { + IORuntime::RT(handle) => *handle, + IORuntime::Config(config) => io_rt(Some(config)), + } + } +} + +/// Wraps any object store and runs IO in it's own runtime [EXPERIMENTAL] +pub struct DeltaIOStorageBackend { + inner: ObjectStoreRef, + rt_handle: &'static Runtime, +} + +impl DeltaIOStorageBackend { + /// create wrapped object store which spawns tasks in own runtime + pub fn new(storage: ObjectStoreRef, rt_handle: &'static Runtime) -> Self { + Self { + inner: storage, + rt_handle, + } + } + + /// spawn taks on IO runtime + pub fn spawn_io_rt( + &self, + f: F, + store: &Arc, + path: Path, + ) -> BoxFuture<'_, ObjectStoreResult> + where + F: for<'a> FnOnce( + &'a Arc, + &'a Path, + ) -> BoxFuture<'a, ObjectStoreResult> + + Send + + 'static, + O: Send + 'static, + { + let store = Arc::clone(store); + let fut = self.rt_handle.spawn(async move { f(&store, &path).await }); + fut.unwrap_or_else(|e| match e.try_into_panic() { + Ok(p) => std::panic::resume_unwind(p), + Err(e) => Err(ObjectStoreError::JoinError { source: e }), + }) + .boxed() + } + + /// spawn taks on IO runtime + pub fn spawn_io_rt_from_to( + &self, + f: F, + store: &Arc, + from: Path, + to: Path, + ) -> BoxFuture<'_, ObjectStoreResult> + where + F: for<'a> FnOnce( + &'a Arc, + &'a Path, + &'a Path, + ) -> BoxFuture<'a, ObjectStoreResult> + + Send + + 'static, + O: Send + 'static, + { + let store = Arc::clone(store); + let fut = self + .rt_handle + .spawn(async move { f(&store, &from, &to).await }); + fut.unwrap_or_else(|e| match e.try_into_panic() { + Ok(p) => std::panic::resume_unwind(p), + Err(e) => Err(ObjectStoreError::JoinError { source: e }), + }) + .boxed() + } +} + +impl std::fmt::Debug for DeltaIOStorageBackend { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "DeltaIOStorageBackend") + } +} + +impl std::fmt::Display for DeltaIOStorageBackend { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "DeltaIOStorageBackend") + } +} + +#[async_trait::async_trait] +impl ObjectStore for DeltaIOStorageBackend { + async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult { + self.spawn_io_rt( + |store, path| store.put(path, bytes), + &self.inner, + location.clone(), + ) + .await + } + + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + options: PutOptions, + ) -> ObjectStoreResult { + self.spawn_io_rt( + |store, path| store.put_opts(path, bytes, options), + &self.inner, + location.clone(), + ) + .await + } + + async fn get(&self, location: &Path) -> ObjectStoreResult { + self.spawn_io_rt(|store, path| store.get(path), &self.inner, location.clone()) + .await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + self.spawn_io_rt( + |store, path| store.get_opts(path, options), + &self.inner, + location.clone(), + ) + .await + } + + async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { + self.spawn_io_rt( + |store, path| store.get_range(path, range), + &self.inner, + location.clone(), + ) + .await + } + + async fn head(&self, location: &Path) -> ObjectStoreResult { + self.spawn_io_rt( + |store, path| store.head(path), + &self.inner, + location.clone(), + ) + .await + } + + async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { + self.spawn_io_rt( + |store, path| store.delete(path), + &self.inner, + location.clone(), + ) + .await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.spawn_io_rt_from_to( + |store, from_path, to_path| store.copy(from_path, to_path), + &self.inner, + from.clone(), + to.clone(), + ) + .await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.spawn_io_rt_from_to( + |store, from_path, to_path| store.copy_if_not_exists(from_path, to_path), + &self.inner, + from.clone(), + to.clone(), + ) + .await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.spawn_io_rt_from_to( + |store, from_path, to_path| store.rename_if_not_exists(from_path, to_path), + &self.inner, + from.clone(), + to.clone(), + ) + .await + } + + async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> { + self.spawn_io_rt( + |store, path| store.put_multipart(path), + &self.inner, + location.clone(), + ) + .await + } + + async fn put_multipart_opts( + &self, + location: &Path, + options: PutMultipartOpts, + ) -> ObjectStoreResult> { + self.spawn_io_rt( + |store, path| store.put_multipart_opts(path, options), + &self.inner, + location.clone(), + ) + .await + } +} + /// Sharable reference to [`ObjectStore`] pub type ObjectStoreRef = Arc; diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index b421a6199b..908f75130a 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -13,7 +13,7 @@ use url::Url; use super::DeltaTable; use crate::errors::{DeltaResult, DeltaTableError}; use crate::logstore::LogStoreRef; -use crate::storage::{factories, StorageOptions}; +use crate::storage::{factories, IORuntime, StorageOptions}; #[allow(dead_code)] #[derive(Debug, thiserror::Error)] @@ -51,7 +51,7 @@ pub enum DeltaVersion { } /// Configuration options for delta table -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct DeltaTableConfig { /// Indicates whether our use case requires tracking tombstones. @@ -79,6 +79,9 @@ pub struct DeltaTableConfig { /// Control the number of records to read / process from the commit / checkpoint files /// when processing record batches. pub log_batch_size: usize, + #[serde(skip_serializing, skip_deserializing)] + /// When a runtime handler is provided, all IO tasks are spawn in that handle + pub io_runtime: Option, } impl Default for DeltaTableConfig { @@ -88,68 +91,34 @@ impl Default for DeltaTableConfig { require_files: true, log_buffer_size: num_cpus::get() * 4, log_batch_size: 1024, + io_runtime: None, } } } -/// Load-time delta table configuration options -#[derive(Debug)] -pub struct DeltaTableLoadOptions { - /// table root uri - pub table_uri: String, - /// backend to access storage system - pub storage_backend: Option<(Arc, Url)>, - /// specify the version we are going to load: a time stamp, a version, or just the newest - /// available version - pub version: DeltaVersion, - /// Indicates whether our use case requires tracking tombstones. - /// This defaults to `true` - /// - /// Read-only applications never require tombstones. Tombstones - /// are only required when writing checkpoints, so even many writers - /// may want to skip them. - pub require_tombstones: bool, - /// Indicates whether DeltaTable should track files. - /// This defaults to `true` - /// - /// Some append-only applications might have no need of tracking any files. - /// Hence, DeltaTable will be loaded with significant memory reduction. - pub require_files: bool, - /// Controls how many files to buffer from the commit log when updating the table. - /// This defaults to 4 * number of cpus - /// - /// Setting a value greater than 1 results in concurrent calls to the storage api. - /// This can be helpful to decrease latency if there are many files in the log since the - /// last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should - /// also be considered for optimal performance. - pub log_buffer_size: usize, - /// Control the number of records to read / process from the commit / checkpoint files - /// when processing record batches. - pub log_batch_size: usize, -} - -impl DeltaTableLoadOptions { - /// create default table load options for a table uri - pub fn new(table_uri: impl Into) -> Self { - Self { - table_uri: table_uri.into(), - storage_backend: None, - require_tombstones: true, - require_files: true, - log_buffer_size: num_cpus::get() * 4, - version: DeltaVersion::default(), - log_batch_size: 1024, - } +impl PartialEq for DeltaTableConfig { + fn eq(&self, other: &Self) -> bool { + self.require_tombstones == other.require_tombstones + && self.require_files == other.require_files + && self.log_buffer_size == other.log_buffer_size + && self.log_batch_size == other.log_batch_size } } /// builder for configuring a delta table load. #[derive(Debug)] pub struct DeltaTableBuilder { - options: DeltaTableLoadOptions, + /// table root uri + table_uri: String, + /// backend to access storage system + storage_backend: Option<(Arc, Url)>, + /// specify the version we are going to load: a time stamp, a version, or just the newest + /// available version + version: DeltaVersion, storage_options: Option>, #[allow(unused_variables)] allow_http: Option, + table_config: DeltaTableConfig, } impl DeltaTableBuilder { @@ -190,27 +159,30 @@ impl DeltaTableBuilder { debug!("creating table builder with {url}"); Ok(Self { - options: DeltaTableLoadOptions::new(url), + table_uri: url.into(), + storage_backend: None, + version: DeltaVersion::default(), storage_options: None, allow_http: None, + table_config: DeltaTableConfig::default(), }) } /// Sets `require_tombstones=false` to the builder pub fn without_tombstones(mut self) -> Self { - self.options.require_tombstones = false; + self.table_config.require_tombstones = false; self } /// Sets `require_files=false` to the builder pub fn without_files(mut self) -> Self { - self.options.require_files = false; + self.table_config.require_files = false; self } /// Sets `version` to the builder pub fn with_version(mut self, version: i64) -> Self { - self.options.version = DeltaVersion::Version(version); + self.version = DeltaVersion::Version(version); self } @@ -221,7 +193,7 @@ impl DeltaTableBuilder { "Log buffer size should be positive", ))); } - self.options.log_buffer_size = log_buffer_size; + self.table_config.log_buffer_size = log_buffer_size; Ok(self) } @@ -235,7 +207,7 @@ impl DeltaTableBuilder { /// specify a timestamp pub fn with_timestamp(mut self, timestamp: DateTime) -> Self { - self.options.version = DeltaVersion::Timestamp(timestamp); + self.version = DeltaVersion::Timestamp(timestamp); self } @@ -248,7 +220,7 @@ impl DeltaTableBuilder { /// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located). /// * `location` - A url corresponding to the storagle location of `storage`. pub fn with_storage_backend(mut self, storage: Arc, location: Url) -> Self { - self.options.storage_backend = Some((storage, location)); + self.storage_backend = Some((storage, location)); self } @@ -273,6 +245,12 @@ impl DeltaTableBuilder { self } + /// Provide a custom runtime handle or runtime config + pub fn with_io_runtime(mut self, io_runtime: IORuntime) -> Self { + self.table_config.io_runtime = Some(io_runtime); + self + } + /// Storage options for configuring backend object store pub fn storage_options(&self) -> StorageOptions { let mut storage_options = self.storage_options.clone().unwrap_or_default(); @@ -286,22 +264,28 @@ impl DeltaTableBuilder { } /// Build a delta storage backend for the given config - pub fn build_storage(self) -> DeltaResult { - debug!("build_storage() with {}", &self.options.table_uri); - let location = Url::parse(&self.options.table_uri).map_err(|_| { - DeltaTableError::NotATable(format!( - "Could not turn {} into a URL", - self.options.table_uri - )) + pub fn build_storage(&self) -> DeltaResult { + debug!("build_storage() with {}", self.table_uri); + let location = Url::parse(&self.table_uri).map_err(|_| { + DeltaTableError::NotATable(format!("Could not turn {} into a URL", self.table_uri)) })?; - if let Some((store, _url)) = self.options.storage_backend.as_ref() { + if let Some((store, _url)) = self.storage_backend.as_ref() { debug!("Loading a logstore with a custom store: {store:?}"); - crate::logstore::logstore_with(store.clone(), location, self.storage_options()) + crate::logstore::logstore_with( + store.clone(), + location, + self.storage_options(), + self.table_config.io_runtime.clone(), + ) } else { // If there has been no backend defined just default to the normal logstore look up debug!("Loading a logstore based off the location: {location:?}"); - crate::logstore::logstore_for(location, self.storage_options()) + crate::logstore::logstore_for( + location, + self.storage_options(), + self.table_config.io_runtime.clone(), + ) } } @@ -310,18 +294,12 @@ impl DeltaTableBuilder { /// This will not load the log, i.e. the table is not initialized. To get an initialized /// table use the `load` function pub fn build(self) -> DeltaResult { - let config = DeltaTableConfig { - require_tombstones: self.options.require_tombstones, - require_files: self.options.require_files, - log_buffer_size: self.options.log_buffer_size, - log_batch_size: self.options.log_batch_size, - }; - Ok(DeltaTable::new(self.build_storage()?, config)) + Ok(DeltaTable::new(self.build_storage()?, self.table_config)) } /// Build the [`DeltaTable`] and load its state pub async fn load(self) -> DeltaResult { - let version = self.options.version; + let version = self.version; let mut table = self.build()?; match version { DeltaVersion::Newest => table.load().await?, diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 10ca7bd770..c7ee976eeb 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -239,9 +239,12 @@ impl<'de> Deserialize<'de> for DeltaTable { let storage_config: LogStoreConfig = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let log_store = - crate::logstore::logstore_for(storage_config.location, storage_config.options) - .map_err(|_| A::Error::custom("Failed deserializing LogStore"))?; + let log_store = crate::logstore::logstore_for( + storage_config.location, + storage_config.options, + None, + ) + .map_err(|_| A::Error::custom("Failed deserializing LogStore"))?; let table = DeltaTable { state, diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index bd7d53612d..3a55c63bb5 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -238,7 +238,7 @@ mod local { .clone(), ) .unwrap(); - let source_store = logstore_for(source_uri, HashMap::new()).unwrap(); + let source_store = logstore_for(source_uri, HashMap::new(), None).unwrap(); let object_store_url = source_store.object_store_url(); let source_store_url: &Url = object_store_url.as_ref(); state diff --git a/python/src/lib.rs b/python/src/lib.rs index 41f751df33..2d6ce42952 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -24,7 +24,6 @@ use deltalake::datafusion::catalog::TableProvider; use deltalake::datafusion::datasource::memory::MemTable; use deltalake::datafusion::physical_plan::ExecutionPlan; use deltalake::datafusion::prelude::SessionContext; -use deltalake::delta_datafusion::cdf::FileAction; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; use deltalake::kernel::{ @@ -52,6 +51,7 @@ use deltalake::parquet::errors::ParquetError; use deltalake::parquet::file::properties::WriterProperties; use deltalake::partitions::PartitionFilter; use deltalake::protocol::{DeltaOperation, SaveMode}; +use deltalake::storage::IORuntime; use deltalake::DeltaTableBuilder; use deltalake::{DeltaOps, DeltaResult}; use futures::future::join_all; @@ -111,7 +111,8 @@ impl RawDeltaTable { log_buffer_size: Option, ) -> PyResult { py.allow_threads(|| { - let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri); + let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri) + .with_io_runtime(IORuntime::default()); let options = storage_options.clone().unwrap_or_default(); if let Some(storage_options) = storage_options { builder = builder.with_storage_options(storage_options) From 5fa5c7e12dc5a93bb615091ea8a5c8da7e16e383 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 19 Aug 2024 18:34:43 +0200 Subject: [PATCH 2/3] refactor: allow handle --- crates/core/src/logstore/mod.rs | 2 +- crates/core/src/storage/mod.rs | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 137faa83f4..94f88b5944 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -134,7 +134,7 @@ pub fn logstore_with( .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?; let store = if let Some(io_runtime) = io_runtime { - Arc::new(DeltaIOStorageBackend::new(store, io_runtime.get_rt())) as ObjectStoreRef + Arc::new(DeltaIOStorageBackend::new(store, io_runtime.get_handle())) as ObjectStoreRef } else { store }; diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 6d07b4e133..37daab755f 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -14,7 +14,7 @@ use object_store::memory::InMemory; use object_store::prefix::PrefixStore; use object_store::{GetOptions, PutOptions, PutPayload, PutResult}; use serde::{Deserialize, Serialize}; -use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; +use tokio::runtime::{Builder as RuntimeBuilder, Handle, Runtime}; use url::Url; use bytes::Bytes; @@ -39,7 +39,7 @@ lazy_static! { } /// Creates static IO Runtime with optional configuration -fn io_rt(config: Option<&RuntimeConfig>) -> &'static Runtime { +fn io_rt(config: Option<&RuntimeConfig>) -> &Runtime { static IO_RT: OnceLock = OnceLock::new(); IO_RT.get_or_init(|| { let rt = match config { @@ -88,36 +88,37 @@ pub struct RuntimeConfig { #[derive(Debug, Clone)] pub enum IORuntime { /// Tokio RT handle - RT(&'static Runtime), + RT(Handle), /// Configuration for tokio runtime Config(RuntimeConfig), } impl Default for IORuntime { fn default() -> Self { - IORuntime::RT(io_rt(None)) + IORuntime::RT(io_rt(None).handle().clone()) } } impl IORuntime { /// Retrieves the Tokio runtime for IO bound operations - pub fn get_rt(&self) -> &'static Runtime { + pub fn get_handle(&self) -> Handle { match self { - IORuntime::RT(handle) => *handle, - IORuntime::Config(config) => io_rt(Some(config)), + IORuntime::RT(handle) => handle, + IORuntime::Config(config) => io_rt(Some(config)).handle(), } + .clone() } } /// Wraps any object store and runs IO in it's own runtime [EXPERIMENTAL] pub struct DeltaIOStorageBackend { inner: ObjectStoreRef, - rt_handle: &'static Runtime, + rt_handle: Handle, } impl DeltaIOStorageBackend { /// create wrapped object store which spawns tasks in own runtime - pub fn new(storage: ObjectStoreRef, rt_handle: &'static Runtime) -> Self { + pub fn new(storage: ObjectStoreRef, rt_handle: Handle) -> Self { Self { inner: storage, rt_handle, From 1131d82261236681deb535ff96c40d7cf5e0da1d Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 19 Aug 2024 20:12:20 +0200 Subject: [PATCH 3/3] chore: bump python --- python/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index c6fd6e26a0..70eb378e20 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.19.0" +version = "0.19.1" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0"