Skip to content

Commit

Permalink
feat: allow multiple Python threads to work with a single DeltaTable …
Browse files Browse the repository at this point in the history
…instance

This change introduces an internal Mutex inside of RawDeltaTable which
allows the PyO3 bindings to share the Python object between threads at
the Python layer.

PyO3 will raise a `RuntimeError: Already borrowed` for any
function call which takes a mutable reference to `self`. Introducing the
internal Mutex ensures that all function signatures can operate with
just self-references safely.

The Rust-level Mutex is a simple passthrough for most operations which
do not need to modify the underlying state. The critical sections which
typically need to acquire and mutate with a lock are after I/O bound
operations are completed as far as I can tell, so I don't anticipate
deadlock or performance issues.

There is still some cleanup of errors that needs to happen to make the
code here more ergonomic when blending DeltaError with PoisonError from
the lock, as such right now there's a lot of ugly error mapping.

Fixes #2958

Signed-off-by: R. Tyler Croy <[email protected]>
Sponsored-by: Neuralink Corp.
  • Loading branch information
rtyler committed Jan 4, 2025
1 parent 6430151 commit 9848ffb
Show file tree
Hide file tree
Showing 7 changed files with 460 additions and 294 deletions.
3 changes: 2 additions & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.23.1"
version = "0.23.2"
authors = ["Qingping Hou <[email protected]>", "Will Jones <[email protected]>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down Expand Up @@ -33,6 +33,7 @@ env_logger = "0"
lazy_static = "1"
regex = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

# runtime
futures = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use arrow_schema::ArrowError;
use deltalake::datafusion::error::DataFusionError;
use deltalake::protocol::ProtocolError;
use deltalake::{errors::DeltaTableError, ObjectStoreError};
use pyo3::exceptions::PyRuntimeError;
use pyo3::exceptions::{
PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyValueError,
};
Expand Down Expand Up @@ -96,6 +97,14 @@ pub enum PythonError {
Protocol(#[from] ProtocolError),
#[error("Error in data fusion")]
DataFusion(#[from] DataFusionError),
#[error("Lock acquisition error")]
ThreadingError(String),
}

impl<T> Into<PythonError> for std::sync::PoisonError<T> {
fn into(self) -> PythonError {
PythonError::ThreadingError(self.to_string())
}
}

impl From<PythonError> for pyo3::PyErr {
Expand All @@ -106,6 +115,7 @@ impl From<PythonError> for pyo3::PyErr {
PythonError::Arrow(err) => arrow_to_py(err),
PythonError::Protocol(err) => checkpoint_to_py(err),
PythonError::DataFusion(err) => datafusion_to_py(err),
PythonError::ThreadingError(err) => PyRuntimeError::new_err(err),
}
}
}
4 changes: 2 additions & 2 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ impl DeltaFileSystemHandler {
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = table._table.object_store();
let storage = table.object_store()?;
Ok(Self {
inner: storage,
config: FsConfig {
root_url: table._table.table_uri(),
root_url: table.with_table(|t| Ok(t.table_uri()))?,
options: options.unwrap_or_default(),
},
known_sizes,
Expand Down
Loading

0 comments on commit 9848ffb

Please sign in to comment.