diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 137faa83f4..94f88b5944 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -134,7 +134,7 @@ pub fn logstore_with( .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?; let store = if let Some(io_runtime) = io_runtime { - Arc::new(DeltaIOStorageBackend::new(store, io_runtime.get_rt())) as ObjectStoreRef + Arc::new(DeltaIOStorageBackend::new(store, io_runtime.get_handle())) as ObjectStoreRef } else { store }; diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 6d07b4e133..37daab755f 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -14,7 +14,7 @@ use object_store::memory::InMemory; use object_store::prefix::PrefixStore; use object_store::{GetOptions, PutOptions, PutPayload, PutResult}; use serde::{Deserialize, Serialize}; -use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; +use tokio::runtime::{Builder as RuntimeBuilder, Handle, Runtime}; use url::Url; use bytes::Bytes; @@ -39,7 +39,7 @@ lazy_static! { } /// Creates static IO Runtime with optional configuration -fn io_rt(config: Option<&RuntimeConfig>) -> &'static Runtime { +fn io_rt(config: Option<&RuntimeConfig>) -> &Runtime { static IO_RT: OnceLock = OnceLock::new(); IO_RT.get_or_init(|| { let rt = match config { @@ -88,36 +88,37 @@ pub struct RuntimeConfig { #[derive(Debug, Clone)] pub enum IORuntime { /// Tokio RT handle - RT(&'static Runtime), + RT(Handle), /// Configuration for tokio runtime Config(RuntimeConfig), } impl Default for IORuntime { fn default() -> Self { - IORuntime::RT(io_rt(None)) + IORuntime::RT(io_rt(None).handle().clone()) } } impl IORuntime { /// Retrieves the Tokio runtime for IO bound operations - pub fn get_rt(&self) -> &'static Runtime { + pub fn get_handle(&self) -> Handle { match self { - IORuntime::RT(handle) => *handle, - IORuntime::Config(config) => io_rt(Some(config)), + IORuntime::RT(handle) => handle, + IORuntime::Config(config) => io_rt(Some(config)).handle(), } + .clone() } } /// Wraps any object store and runs IO in it's own runtime [EXPERIMENTAL] pub struct DeltaIOStorageBackend { inner: ObjectStoreRef, - rt_handle: &'static Runtime, + rt_handle: Handle, } impl DeltaIOStorageBackend { /// create wrapped object store which spawns tasks in own runtime - pub fn new(storage: ObjectStoreRef, rt_handle: &'static Runtime) -> Self { + pub fn new(storage: ObjectStoreRef, rt_handle: Handle) -> Self { Self { inner: storage, rt_handle,