Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow configurable number of commit attempts #1596

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions rust/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub(crate) async fn try_commit_transaction(
Ok(version)
}

/// Commit a transaction, with up to 5 retries. This is low-level transaction API.
/// Commit a transaction, with up to 15 retries. This is higher-level transaction API.
///
/// Will error early if the a concurrent transaction has already been committed
/// and conflicts with this transaction.
Expand All @@ -161,14 +161,28 @@ pub async fn commit(
operation: DeltaOperation,
read_snapshot: &DeltaTableState,
app_metadata: Option<Map<String, Value>>,
) -> DeltaResult<i64> {
commit_with_retries(storage, actions, operation, read_snapshot, app_metadata, 15).await
}

/// Commit a transaction, with up configurable number of retries. This is higher-level transaction API.
///
/// The function will error early if the a concurrent transaction has already been committed
/// and conflicts with this transaction.
pub async fn commit_with_retries(
storage: &dyn ObjectStore,
actions: &Vec<Action>,
operation: DeltaOperation,
read_snapshot: &DeltaTableState,
app_metadata: Option<Map<String, Value>>,
max_retries: usize,
) -> DeltaResult<i64> {
let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?;

let max_attempts = 15;
let mut attempt_number = 1;

while attempt_number <= max_attempts {
let version = read_snapshot.version() + attempt_number;
while attempt_number <= max_retries {
let version = read_snapshot.version() + attempt_number as i64;
match try_commit_transaction(storage, &tmp_commit, version).await {
Ok(version) => return Ok(version),
Err(TransactionError::VersionAlreadyExists(version)) => {
Expand Down Expand Up @@ -199,7 +213,7 @@ pub async fn commit(
}
}

Err(TransactionError::MaxCommitAttempts(max_attempts as i32).into())
Err(TransactionError::MaxCommitAttempts(max_retries as i32).into())
}

#[cfg(all(test, feature = "parquet"))]
Expand Down