From 22fa5d6d3f0ec469221fb174a0c9886a9300acdf Mon Sep 17 00:00:00 2001 From: Cole MacKenzie Date: Mon, 21 Aug 2023 12:01:54 -0700 Subject: [PATCH] feat: allow configurable number of `commit` attempts This adds `transaction::commit_with_retries` function where the number of attempts can be specified. The default behavior for `transaction::commit` remains 15. Closes #1595. --- rust/src/operations/transaction/mod.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/rust/src/operations/transaction/mod.rs b/rust/src/operations/transaction/mod.rs index 8fe7dd69af..fe99af1749 100644 --- a/rust/src/operations/transaction/mod.rs +++ b/rust/src/operations/transaction/mod.rs @@ -151,7 +151,7 @@ pub(crate) async fn try_commit_transaction( Ok(version) } -/// Commit a transaction, with up to 5 retries. This is low-level transaction API. +/// Commit a transaction, with up to 15 retries. This is higher-level transaction API. /// /// Will error early if the a concurrent transaction has already been committed /// and conflicts with this transaction. @@ -161,14 +161,28 @@ pub async fn commit( operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, +) -> DeltaResult { + commit_with_retries(storage, actions, operation, read_snapshot, app_metadata, 15).await +} + +/// Commit a transaction, with up configurable number of retries. This is higher-level transaction API. +/// +/// The function will error early if the a concurrent transaction has already been committed +/// and conflicts with this transaction. +pub async fn commit_with_retries( + storage: &dyn ObjectStore, + actions: &Vec, + operation: DeltaOperation, + read_snapshot: &DeltaTableState, + app_metadata: Option>, + max_retries: usize, ) -> DeltaResult { let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?; - let max_attempts = 15; let mut attempt_number = 1; - while attempt_number <= max_attempts { - let version = read_snapshot.version() + attempt_number; + while attempt_number <= max_retries { + let version = read_snapshot.version() + attempt_number as i64; match try_commit_transaction(storage, &tmp_commit, version).await { Ok(version) => return Ok(version), Err(TransactionError::VersionAlreadyExists(version)) => { @@ -199,7 +213,7 @@ pub async fn commit( } } - Err(TransactionError::MaxCommitAttempts(max_attempts as i32).into()) + Err(TransactionError::MaxCommitAttempts(max_retries as i32).into()) } #[cfg(all(test, feature = "parquet"))]