diff --git a/src/Tingle.EventBus/Configuration/EventRegistration.cs b/src/Tingle.EventBus/Configuration/EventRegistration.cs index 095463fb..9d0da9cb 100644 --- a/src/Tingle.EventBus/Configuration/EventRegistration.cs +++ b/src/Tingle.EventBus/Configuration/EventRegistration.cs @@ -1,4 +1,5 @@ -using Polly.Retry; +using Polly; +using Polly.Retry; using Tingle.EventBus.Serialization; namespace Tingle.EventBus.Configuration; @@ -51,10 +52,9 @@ public EventRegistration(Type eventType) public Type? EventSerializerType { get; set; } /// - /// The retry policy to apply when in addition to what may be provided by the SDKs for each transport. - /// When set to , no additional retry policy is applied. - /// Defaults to . - /// When this value is set, it overrides the default value set on the transport or the bus. + /// The retry policy to apply specifically for this event. + /// This is in addition to what may be provided by the SDKs for each transport. + /// When provided alongside policies on the transport and the bus, it is used as the inner most policy. /// /// /// When a value is provided, the transport may extend the lock for the @@ -78,6 +78,12 @@ public EventRegistration(Type eventType) /// public IDictionary Metadata { get; set; } = new Dictionary(); + /// Whether the execution policies have been merged. + internal bool MergedExecutionPolicies { get; set; } = false; + + /// The final policy used in executions for the event and it's consumers. + internal IAsyncPolicy ExecutionPolicy { get; set; } = Policy.NoOpAsync(); + #region Equality Overrides /// diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs b/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs index 3161c2f8..0bf2c97b 100644 --- a/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs +++ b/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs @@ -24,6 +24,17 @@ public class EventBusOptions /// public EventBusNamingOptions Naming { get; } = new EventBusNamingOptions(); + /// + /// Optional retry policy to apply to the bus. + /// When provided alongside policies on the transport and the event registration, it is used as the putter most policy. + /// Defaults to . + /// + /// + /// To specify a value on an event registration, use . + /// To specify a value on a transport, use for the specific transport. + /// + public AsyncRetryPolicy? RetryPolicy { get; set; } + /// /// Indicates if the messages/events produced require guard against duplicate messages. /// If , duplicate messages having the same @@ -53,14 +64,6 @@ public class EventBusOptions /// public EventIdFormat DefaultEventIdFormat { get; set; } = EventIdFormat.Guid; - /// - /// Optional default retry policy to use where it is not specified. - /// To specify a value per consumer, use the option. - /// To specify a value per transport, use the option on the specific transport. - /// Defaults to . - /// - public AsyncRetryPolicy? DefaultRetryPolicy { get; set; } - /// /// Optional default behaviour for errors encountered in a consumer but are not handled. /// To specify a value per consumer, use the option. diff --git a/src/Tingle.EventBus/PollyHelper.cs b/src/Tingle.EventBus/PollyHelper.cs new file mode 100644 index 00000000..1f11514a --- /dev/null +++ b/src/Tingle.EventBus/PollyHelper.cs @@ -0,0 +1,38 @@ +using Microsoft.Extensions.DependencyInjection; +using Polly; +using Tingle.EventBus.Configuration; +using Tingle.EventBus.Transports; + +namespace Tingle.EventBus; + +internal static class PollyHelper +{ + public static void Combine(EventBusOptions busOptions, EventBusTransportOptions transportOptions, EventRegistration registration) + { + if (busOptions is null) throw new ArgumentNullException(nameof(busOptions)); + if (transportOptions is null) throw new ArgumentNullException(nameof(transportOptions)); + if (registration is null) throw new ArgumentNullException(nameof(registration)); + + // if the policies have been merged, there is no need to repeat the process + if (registration.MergedExecutionPolicies) return; + + registration.ExecutionPolicy = CombineInternal(busOptions, transportOptions, registration); + registration.MergedExecutionPolicies = true; + } + + private static IAsyncPolicy CombineInternal(EventBusOptions busOptions, EventBusTransportOptions transportOptions, EventRegistration registration) + { + var policies = new IAsyncPolicy?[] { + busOptions.RetryPolicy, // outer + transportOptions.RetryPolicy, + registration.RetryPolicy, // inner + }.Where(p => p is not null).Select(p => p!).ToArray(); + + return policies.Length switch + { + 0 => Policy.NoOpAsync(), // if there are none, return No-Op, if + 1 => policies[0], // a single policy can just be used (no need to combine) + _ => Policy.WrapAsync(policies), // more than one needs to be combined + }; + } +} diff --git a/src/Tingle.EventBus/Transports/EventBusTransport.cs b/src/Tingle.EventBus/Transports/EventBusTransport.cs index f7572592..17204f1c 100644 --- a/src/Tingle.EventBus/Transports/EventBusTransport.cs +++ b/src/Tingle.EventBus/Transports/EventBusTransport.cs @@ -80,9 +80,8 @@ public virtual Task StartAsync(CancellationToken cancellationToken) var registrations = GetRegistrations(); foreach (var reg in registrations) { - // Set publish retry policy - reg.RetryPolicy ??= Options.DefaultRetryPolicy; - reg.RetryPolicy ??= BusOptions.DefaultRetryPolicy; + // Combine the retry policies + PollyHelper.Combine(BusOptions, Options, reg); foreach (var ecr in reg.Consumers) { @@ -111,16 +110,9 @@ public virtual Task StopAsync(CancellationToken cancellationToken) CancellationToken cancellationToken = default) where TEvent : class { - // publish, with retry if specified - var retryPolicy = registration.RetryPolicy; - if (retryPolicy != null) - { - return await retryPolicy.ExecuteAsync(ct => PublishCoreAsync(@event, registration, scheduled, ct), cancellationToken).ConfigureAwait(false); - } - else - { - return await PublishCoreAsync(@event, registration, scheduled, cancellationToken).ConfigureAwait(false); - } + // publish, with resilience policies + return await registration.ExecutionPolicy.ExecuteAsync( + ct => PublishCoreAsync(@event, registration, scheduled, ct), cancellationToken).ConfigureAwait(false); } /// @@ -130,16 +122,9 @@ public virtual Task StopAsync(CancellationToken cancellationToken) CancellationToken cancellationToken = default) where TEvent : class { - // publish, with retry if specified - var retryPolicy = registration.RetryPolicy; - if (retryPolicy != null) - { - return await retryPolicy.ExecuteAsync(ct => PublishCoreAsync(events, registration, scheduled, ct), cancellationToken).ConfigureAwait(false); - } - else - { - return await PublishCoreAsync(events, registration, scheduled, cancellationToken).ConfigureAwait(false); - } + // publish, with resilience policies + return await registration.ExecutionPolicy.ExecuteAsync( + ct => PublishCoreAsync(events, registration, scheduled, ct), cancellationToken).ConfigureAwait(false); } /// @@ -164,32 +149,18 @@ public virtual Task StopAsync(CancellationToken cancellationToken) public virtual async Task CancelAsync(string id, EventRegistration registration, CancellationToken cancellationToken = default) where TEvent : class { - // cancel, with retry if specified - var retryPolicy = registration.RetryPolicy; - if (retryPolicy != null) - { - await retryPolicy.ExecuteAsync(ct => CancelCoreAsync(id, registration, ct), cancellationToken).ConfigureAwait(false); - } - else - { - await CancelCoreAsync(id, registration, cancellationToken).ConfigureAwait(false); - } + // cancel, with resilience policies + await registration.ExecutionPolicy.ExecuteAsync( + ct => CancelCoreAsync(id, registration, ct), cancellationToken).ConfigureAwait(false); } /// public virtual async Task CancelAsync(IList ids, EventRegistration registration, CancellationToken cancellationToken = default) where TEvent : class { - // cancel, with retry if specified - var retryPolicy = registration.RetryPolicy; - if (retryPolicy != null) - { - await retryPolicy.ExecuteAsync(ct => CancelCoreAsync(ids, registration, ct), cancellationToken).ConfigureAwait(false); - } - else - { - await CancelCoreAsync(ids, registration, cancellationToken).ConfigureAwait(false); - } + // cancel, with resilience policies + await registration.ExecutionPolicy.ExecuteAsync( + ct => CancelCoreAsync(ids, registration, ct), cancellationToken).ConfigureAwait(false); } /// @@ -332,16 +303,9 @@ protected async Task ConsumeAsync(EventRe // Resolve the consumer var consumer = ActivatorUtilities.GetServiceOrCreateInstance(scope.ServiceProvider); - // Invoke handler method, with retry if specified - var retryPolicy = registration.RetryPolicy; - if (retryPolicy != null) - { - await retryPolicy.ExecuteAsync(ct => consumer.ConsumeAsync(@event, ct), cancellationToken).ConfigureAwait(false); - } - else - { - await consumer.ConsumeAsync(@event, cancellationToken).ConfigureAwait(false); - } + // Invoke handler method, with resilience policies + await registration.ExecutionPolicy.ExecuteAsync( + ct => consumer.ConsumeAsync(@event, ct), cancellationToken).ConfigureAwait(false); return new EventConsumeResult(successful: true, exception: null); } diff --git a/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs b/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs index 15e64301..d224a52c 100644 --- a/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs +++ b/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs @@ -9,6 +9,18 @@ namespace Tingle.EventBus.Transports; /// public abstract class EventBusTransportOptions { + /// + /// Optional retry policy to apply specifically for this transport. + /// This is in addition to what may be provided by the transport SDKs. + /// When provided alongside policies on the bus and the event registration, + /// it is configured inner to the one on the bus and outer to the one on the event registration. + /// + /// + /// To specify a value on an event registration, use . + /// To specify a value on the bus, use . + /// + public AsyncRetryPolicy? RetryPolicy { get; set; } + /// /// The delay to introduce every time zero messages are received. /// This eases on the CPU consumption and reduces the query costs. @@ -48,13 +60,6 @@ public abstract class EventBusTransportOptions /// public EventIdFormat? DefaultEventIdFormat { get; set; } - /// - /// Optional default retry policy to use where it is not specified. - /// This value overrides the default value set on the bus via . - /// To specify a value per consumer, use the option. - /// - public AsyncRetryPolicy? DefaultRetryPolicy { get; set; } - /// /// Optional default behaviour for errors encountered in a consumer but are not handled. /// This value overrides the default value set on the bus via .