Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable IO runtime #2789

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,31 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
Some((s3_key, value.clone()))
}),
)?;
let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?;

let store = limit_store_handler(inner, &options);

// If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename.
if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
Ok((store, prefix))
} else {
let s3_options = S3StorageOptions::from_map(&storage_options.0)?;

let store = S3StorageBackend::try_new(
store,
Some("dynamodb") == s3_options.locking_provider.as_deref()
|| s3_options.allow_unsafe_rename,
)?;
Ok((store, prefix))
}
}

Ok((Arc::new(store), prefix))
}
fn aws_storage_handler(
store: ObjectStoreRef,
options: &StorageOptions,
) -> DeltaResult<ObjectStoreRef> {
// If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename.
if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
Ok(store)
} else {
let s3_options = S3StorageOptions::from_map(&options.0)?;

let store = S3StorageBackend::try_new(
store,
Some("dynamodb") == s3_options.locking_provider.as_deref()
|| s3_options.allow_unsafe_rename,
)?;
Ok(Arc::new(store))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ impl EagerSnapshot {
}
}

fn stats_schema<'a>(schema: &StructType, config: TableConfig<'a>) -> DeltaResult<StructType> {
fn stats_schema(schema: &StructType, config: TableConfig<'_>) -> DeltaResult<StructType> {
let stats_fields = if let Some(stats_cols) = config.stats_columns() {
stats_cols
.iter()
Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ pub use self::data_catalog::{DataCatalog, DataCatalogError};
pub use self::errors::*;
pub use self::schema::partitions::*;
pub use self::schema::*;
pub use self::table::builder::{
DeltaTableBuilder, DeltaTableConfig, DeltaTableLoadOptions, DeltaVersion,
};
pub use self::table::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion};
pub use self::table::config::DeltaConfigKey;
pub use self::table::DeltaTable;
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
Expand Down
29 changes: 24 additions & 5 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ use url::Url;
use crate::kernel::Action;
use crate::operations::transaction::TransactionError;
use crate::protocol::{get_last_checkpoint, ProtocolError};
use crate::storage::DeltaIOStorageBackend;
use crate::storage::{
commit_uri_from_version, retry_ext::ObjectStoreRetryExt, ObjectStoreRef, StorageOptions,
commit_uri_from_version, retry_ext::ObjectStoreRetryExt, IORuntime, ObjectStoreRef,
StorageOptions,
};

use crate::{DeltaResult, DeltaTableError};

#[cfg(feature = "datafusion")]
Expand Down Expand Up @@ -98,22 +101,24 @@ lazy_static! {
/// # use std::collections::HashMap;
/// # use url::Url;
/// let location = Url::parse("memory:///").expect("Failed to make location");
/// let logstore = logstore_for(location, HashMap::new()).expect("Failed to get a logstore");
/// let logstore = logstore_for(location, HashMap::new(), None).expect("Failed to get a logstore");
/// ```
pub fn logstore_for(
location: Url,
options: impl Into<StorageOptions> + Clone,
io_runtime: Option<IORuntime>,
) -> DeltaResult<LogStoreRef> {
// turn location into scheme
let scheme = Url::parse(&format!("{}://", location.scheme()))
.map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;

if let Some(entry) = crate::storage::factories().get(&scheme) {
debug!("Found a storage provider for {scheme} ({location})");

let (store, _prefix) = entry
.value()
.parse_url_opts(&location, &options.clone().into())?;
return logstore_with(store, location, options);
return logstore_with(store, location, options, io_runtime);
}
Err(DeltaTableError::InvalidTableLocation(location.into()))
}
Expand All @@ -123,10 +128,17 @@ pub fn logstore_with(
store: ObjectStoreRef,
location: Url,
options: impl Into<StorageOptions> + Clone,
io_runtime: Option<IORuntime>,
) -> DeltaResult<LogStoreRef> {
let scheme = Url::parse(&format!("{}://", location.scheme()))
.map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;

let store = if let Some(io_runtime) = io_runtime {
Arc::new(DeltaIOStorageBackend::new(store, io_runtime.get_handle())) as ObjectStoreRef
} else {
store
};

if let Some(factory) = logstores().get(&scheme) {
debug!("Found a logstore provider for {scheme}");
return factory.with_options(store, &location, &options.into());
Expand Down Expand Up @@ -471,14 +483,21 @@ mod tests {
#[test]
fn logstore_with_invalid_url() {
let location = Url::parse("nonexistent://table").unwrap();
let store = logstore_for(location, HashMap::default());
let store = logstore_for(location, HashMap::default(), None);
assert!(store.is_err());
}

#[test]
fn logstore_with_memory() {
let location = Url::parse("memory://table").unwrap();
let store = logstore_for(location, HashMap::default());
let store = logstore_for(location, HashMap::default(), None);
assert!(store.is_ok());
}

#[test]
fn logstore_with_memory_and_rt() {
let location = Url::parse("memory://table").unwrap();
let store = logstore_for(location, HashMap::default(), Some(IORuntime::default()));
assert!(store.is_ok());
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl ConvertToDeltaBuilder {
crate::logstore::logstore_for(
ensure_table_uri(location)?,
self.storage_options.unwrap_or_default(),
None, // TODO: allow runtime to be passed into builder
)?
} else {
return Err(Error::MissingLocation);
Expand Down Expand Up @@ -477,7 +478,7 @@ mod tests {
fn log_store(path: impl Into<String>) -> LogStoreRef {
let path: String = path.into();
let location = ensure_table_uri(path).expect("Failed to get the URI from the path");
crate::logstore::logstore_for(location, StorageOptions::default())
crate::logstore::logstore_for(location, StorageOptions::default(), None)
.expect("Failed to create an object store")
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ impl PartitionWriter {

// write file to object store
self.object_store.put(&path, buffer.into()).await?;

self.files_written.push(
create_add(
&self.config.partition_values,
Expand Down
Loading
Loading