Skip to content

Commit

Permalink
refactor: allow handle
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 19, 2024
1 parent 5901f90 commit 5fa5c7e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub fn logstore_with(
.map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;

let store = if let Some(io_runtime) = io_runtime {
Arc::new(DeltaIOStorageBackend::new(store, io_runtime.get_rt())) as ObjectStoreRef
Arc::new(DeltaIOStorageBackend::new(store, io_runtime.get_handle())) as ObjectStoreRef
} else {
store
};
Expand Down
19 changes: 10 additions & 9 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use object_store::memory::InMemory;
use object_store::prefix::PrefixStore;
use object_store::{GetOptions, PutOptions, PutPayload, PutResult};
use serde::{Deserialize, Serialize};
use tokio::runtime::{Builder as RuntimeBuilder, Runtime};
use tokio::runtime::{Builder as RuntimeBuilder, Handle, Runtime};
use url::Url;

use bytes::Bytes;
Expand All @@ -39,7 +39,7 @@ lazy_static! {
}

/// Creates static IO Runtime with optional configuration
fn io_rt(config: Option<&RuntimeConfig>) -> &'static Runtime {
fn io_rt(config: Option<&RuntimeConfig>) -> &Runtime {
static IO_RT: OnceLock<Runtime> = OnceLock::new();
IO_RT.get_or_init(|| {
let rt = match config {
Expand Down Expand Up @@ -88,36 +88,37 @@ pub struct RuntimeConfig {
#[derive(Debug, Clone)]
pub enum IORuntime {
/// Tokio RT handle
RT(&'static Runtime),
RT(Handle),
/// Configuration for tokio runtime
Config(RuntimeConfig),
}

impl Default for IORuntime {
fn default() -> Self {
IORuntime::RT(io_rt(None))
IORuntime::RT(io_rt(None).handle().clone())
}
}

impl IORuntime {
/// Retrieves the Tokio runtime for IO bound operations
pub fn get_rt(&self) -> &'static Runtime {
pub fn get_handle(&self) -> Handle {
match self {
IORuntime::RT(handle) => *handle,
IORuntime::Config(config) => io_rt(Some(config)),
IORuntime::RT(handle) => handle,
IORuntime::Config(config) => io_rt(Some(config)).handle(),
}
.clone()
}
}

/// Wraps any object store and runs IO in it's own runtime [EXPERIMENTAL]
pub struct DeltaIOStorageBackend {
inner: ObjectStoreRef,
rt_handle: &'static Runtime,
rt_handle: Handle,
}

impl DeltaIOStorageBackend {
/// create wrapped object store which spawns tasks in own runtime
pub fn new(storage: ObjectStoreRef, rt_handle: &'static Runtime) -> Self {
pub fn new(storage: ObjectStoreRef, rt_handle: Handle) -> Self {
Self {
inner: storage,
rt_handle,
Expand Down

0 comments on commit 5fa5c7e

Please sign in to comment.