From 59b96bcb67797b0376dbcf82603e7644a40ed247 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 8 Sep 2023 08:26:50 -0700 Subject: [PATCH] Fixed up vague argument in commit method --- python/python/lance/dataset.py | 19 ++++++++++++++----- python/src/dataset.rs | 13 ++++++++++--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index c06c8a7d63..02f9fa1c93 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -684,7 +684,7 @@ def _commit( base_uri: Union[str, Path], operation: LanceOperation.BaseOperation, read_version: Optional[int] = None, - options: Optional[Dict[str, Any]] = None, + commit_lock: Optional[CommitLock] = None, ) -> LanceDataset: """Create a new version of dataset @@ -709,9 +709,9 @@ def _commit( read_version: int, optional The version of the dataset that was used as the base for the changes. This is not needed for overwrite or restore operations. - options : dict, optional - Options for controlling how the commit is performed. For example, - what object store to use to access the manifest. + commit_lock : CommitLock, optional + A custom commit lock. Only needed if your object store does not support + atomic commits. See the user guide for more details. Returns ------- @@ -726,7 +726,13 @@ def _commit( if isinstance(base_uri, Path): base_uri = str(base_uri) - _Dataset.commit(base_uri, operation._to_inner(), read_version, options) + if commit_lock: + if not callable(commit_lock): + raise TypeError( + f"commit_lock must be a function, got {type(commit_lock)}" + ) + + _Dataset.commit(base_uri, operation._to_inner(), read_version, commit_lock) return LanceDataset(base_uri) @@ -1091,6 +1097,9 @@ def write_dataset( The max number of rows to write before starting a new file max_rows_per_group: int, default 1024 The max number of rows before starting a new group (in the same file) + commit_lock : CommitLock, optional + A custom commit lock. Only needed if your object store does not support + atomic commits. See the user guide for more details. """ reader = _coerce_reader(data_obj, schema) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 1aa3c6c5e2..de5a386f51 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -590,12 +590,19 @@ impl Dataset { dataset_uri: &str, operation: Operation, read_version: Option, - options: Option<&PyDict>, + commit_lock: Option<&PyAny>, ) -> PyResult { - let store_params = options.map(get_object_store_params).unwrap_or(None); + let store_params = if let Some(commit_handler) = commit_lock { + let py_commit_lock = PyCommitLock::new(commit_handler.to_object(commit_handler.py())); + let mut object_store_params = ObjectStoreParams::default(); + object_store_params.set_commit_lock(Arc::new(py_commit_lock)); + Some(object_store_params) + } else { + None + }; let ds = RT .block_on( - options.map(|opts| opts.py()), + commit_lock.map(|cl| cl.py()), LanceDataset::commit(dataset_uri, operation.0, read_version, store_params), ) .map_err(|e| PyIOError::new_err(e.to_string()))?;