Skip to content

Commit

Permalink
use common retry policy builder function
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiogenesis committed Sep 11, 2024
1 parent 6635621 commit cc489f3
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,8 @@ internal virtual async Task<Exception> InternalApplyTableChangesAsync(ScopeInfo

this.Logger.LogInformation("[InternalApplyTableChangesAsync]. Directory name {DirectoryName}. BatchParts count {BatchPartsInfoCount}", message.Changes.DirectoryName, message.Changes.BatchPartsInfo.Count);

// If we have a transient error happening, and we are rerunning the tranaction,
// raising an interceptor
var onRetry = new Func<Exception, int, TimeSpan, object, Task>((ex, cpt, ts, arg) =>
this.InterceptAsync(new TransientErrorOccuredArgs(context, connection, ex, cpt, ts), progress, cancellationToken).AsTask());

// Defining my retry policy
var retryPolicy = this.Options.TransactionMode != TransactionMode.AllOrNothing
? SyncPolicy.WaitAndRetryForever(retryAttempt => TimeSpan.FromMilliseconds(500 * retryAttempt), (ex, arg) => this.Provider.ShouldRetryOn(ex), onRetry)
: SyncPolicy.WaitAndRetry(0, TimeSpan.Zero);
var retryPolicy = this.BuildSyncPolicy(context, connection, progress, cancellationToken);

var applyChangesPolicyResult = await retryPolicy.ExecuteAsync(
async () =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,8 @@ public partial class LocalOrchestrator : BaseOrchestrator
DbConnection connection = default, DbTransaction transaction = default,
IProgress<ProgressArgs> progress = null, CancellationToken cancellationToken = default)
{
// If we have a transient error happening, and we are rerunning the tranaction,
// raising an interceptor
var onRetry = new Func<Exception, int, TimeSpan, object, Task>((ex, cpt, ts, arg) =>
this.InterceptAsync(new TransientErrorOccuredArgs(context, connection, ex, cpt, ts), progress, cancellationToken).AsTask());

// Defining my retry policy
SyncPolicy retryPolicy = this.Options.TransactionMode == TransactionMode.AllOrNothing
? retryPolicy = SyncPolicy.WaitAndRetryForever(retryAttempt => TimeSpan.FromMilliseconds(500 * retryAttempt), (ex, arg) => this.Provider.ShouldRetryOn(ex), onRetry)
: retryPolicy = SyncPolicy.WaitAndRetry(0, TimeSpan.Zero);
var retryPolicy = this.BuildSyncPolicy(context, connection, progress, cancellationToken);

// Execute my OpenAsync in my policy context
var applyChangesResult = await retryPolicy.ExecuteAsync<(SyncContext Context, ClientSyncChanges ClientSyncChange, ScopeInfoClient CScopeInfoClient)>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,8 @@ public partial class RemoteOrchestrator : BaseOrchestrator
// Previous sync errors
BatchInfo lastSyncErrorsBatchInfo = null;

// If we have a transient error happening, and we are rerunning the tranaction,
// raising an interceptor
var onRetry = new Func<Exception, int, TimeSpan, object, Task>((ex, cpt, ts, arg) =>
this.InterceptAsync(new TransientErrorOccuredArgs(context, connection, ex, cpt, ts), progress, cancellationToken).AsTask());

// Defining my retry policy
SyncPolicy retryPolicy = this.Options.TransactionMode == TransactionMode.AllOrNothing
? retryPolicy = SyncPolicy.WaitAndRetryForever(retryAttempt => TimeSpan.FromMilliseconds(500 * retryAttempt), (ex, arg) => this.Provider.ShouldRetryOn(ex), onRetry)
: retryPolicy = SyncPolicy.WaitAndRetry(0, TimeSpan.Zero);
var retryPolicy = this.BuildSyncPolicy(context, connection, progress, cancellationToken);

await retryPolicy.ExecuteAsync(
async () =>
Expand Down
16 changes: 16 additions & 0 deletions Projects/Dotmim.Sync.Core/Orchestrators/BaseOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -445,5 +445,21 @@ internal SyncException GetSyncError(SyncContext context, Exception exception, st
throw this.GetSyncError(context, ex);
}
}

protected SyncPolicy BuildSyncPolicy(SyncContext context, DbConnection connection, IProgress<ProgressArgs> progress, CancellationToken cancellationToken, int retryIntervalMilliseconds = 500)
{
// If we have a transient error happening, and we are rerunning the transaction,
// raising an interceptor
var onRetry = new Func<Exception, int, TimeSpan, object, Task>((ex, cpt, ts, arg) =>
this.InterceptAsync(new TransientErrorOccuredArgs(context, connection, ex, cpt, ts), progress, cancellationToken).AsTask());

return this.Options.TransactionMode switch
{
TransactionMode.AllOrNothing => SyncPolicy.WaitAndRetry(0, TimeSpan.Zero),
TransactionMode.PerBatch => SyncPolicy.WaitAndRetryForever(retryAttempt => TimeSpan.FromMilliseconds(retryIntervalMilliseconds * retryAttempt), (ex, arg) => this.Provider.ShouldRetryOn(ex), onRetry),
TransactionMode.None => SyncPolicy.WaitAndRetryForever(retryAttempt => TimeSpan.FromMilliseconds(retryIntervalMilliseconds * retryAttempt), (ex, arg) => this.Provider.ShouldRetryOn(ex), onRetry),
_ => throw new NotImplementedException(),
};
}
}
}

0 comments on commit cc489f3

Please sign in to comment.