diff --git a/src/Tingle.EventBus/Configuration/EventRegistration.cs b/src/Tingle.EventBus/Configuration/EventRegistration.cs
index 095463fb..9d0da9cb 100644
--- a/src/Tingle.EventBus/Configuration/EventRegistration.cs
+++ b/src/Tingle.EventBus/Configuration/EventRegistration.cs
@@ -1,4 +1,5 @@
-using Polly.Retry;
+using Polly;
+using Polly.Retry;
using Tingle.EventBus.Serialization;
namespace Tingle.EventBus.Configuration;
@@ -51,10 +52,9 @@ public EventRegistration(Type eventType)
public Type? EventSerializerType { get; set; }
///
- /// The retry policy to apply when in addition to what may be provided by the SDKs for each transport.
- /// When set to , no additional retry policy is applied.
- /// Defaults to .
- /// When this value is set, it overrides the default value set on the transport or the bus.
+ /// The retry policy to apply specifically for this event.
+ /// This is in addition to what may be provided by the SDKs for each transport.
+ /// When provided alongside policies on the transport and the bus, it is used as the inner most policy.
///
///
/// When a value is provided, the transport may extend the lock for the
@@ -78,6 +78,12 @@ public EventRegistration(Type eventType)
///
public IDictionary Metadata { get; set; } = new Dictionary();
+ /// Whether the execution policies have been merged.
+ internal bool MergedExecutionPolicies { get; set; } = false;
+
+ /// The final policy used in executions for the event and it's consumers.
+ internal IAsyncPolicy ExecutionPolicy { get; set; } = Policy.NoOpAsync();
+
#region Equality Overrides
///
diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs b/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs
index 3161c2f8..0bf2c97b 100644
--- a/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs
+++ b/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs
@@ -24,6 +24,17 @@ public class EventBusOptions
///
public EventBusNamingOptions Naming { get; } = new EventBusNamingOptions();
+ ///
+ /// Optional retry policy to apply to the bus.
+ /// When provided alongside policies on the transport and the event registration, it is used as the putter most policy.
+ /// Defaults to .
+ ///
+ ///
+ /// To specify a value on an event registration, use .
+ /// To specify a value on a transport, use for the specific transport.
+ ///
+ public AsyncRetryPolicy? RetryPolicy { get; set; }
+
///
/// Indicates if the messages/events produced require guard against duplicate messages.
/// If , duplicate messages having the same
@@ -53,14 +64,6 @@ public class EventBusOptions
///
public EventIdFormat DefaultEventIdFormat { get; set; } = EventIdFormat.Guid;
- ///
- /// Optional default retry policy to use where it is not specified.
- /// To specify a value per consumer, use the option.
- /// To specify a value per transport, use the option on the specific transport.
- /// Defaults to .
- ///
- public AsyncRetryPolicy? DefaultRetryPolicy { get; set; }
-
///
/// Optional default behaviour for errors encountered in a consumer but are not handled.
/// To specify a value per consumer, use the option.
diff --git a/src/Tingle.EventBus/PollyHelper.cs b/src/Tingle.EventBus/PollyHelper.cs
new file mode 100644
index 00000000..1f11514a
--- /dev/null
+++ b/src/Tingle.EventBus/PollyHelper.cs
@@ -0,0 +1,38 @@
+using Microsoft.Extensions.DependencyInjection;
+using Polly;
+using Tingle.EventBus.Configuration;
+using Tingle.EventBus.Transports;
+
+namespace Tingle.EventBus;
+
+internal static class PollyHelper
+{
+ public static void Combine(EventBusOptions busOptions, EventBusTransportOptions transportOptions, EventRegistration registration)
+ {
+ if (busOptions is null) throw new ArgumentNullException(nameof(busOptions));
+ if (transportOptions is null) throw new ArgumentNullException(nameof(transportOptions));
+ if (registration is null) throw new ArgumentNullException(nameof(registration));
+
+ // if the policies have been merged, there is no need to repeat the process
+ if (registration.MergedExecutionPolicies) return;
+
+ registration.ExecutionPolicy = CombineInternal(busOptions, transportOptions, registration);
+ registration.MergedExecutionPolicies = true;
+ }
+
+ private static IAsyncPolicy CombineInternal(EventBusOptions busOptions, EventBusTransportOptions transportOptions, EventRegistration registration)
+ {
+ var policies = new IAsyncPolicy?[] {
+ busOptions.RetryPolicy, // outer
+ transportOptions.RetryPolicy,
+ registration.RetryPolicy, // inner
+ }.Where(p => p is not null).Select(p => p!).ToArray();
+
+ return policies.Length switch
+ {
+ 0 => Policy.NoOpAsync(), // if there are none, return No-Op, if
+ 1 => policies[0], // a single policy can just be used (no need to combine)
+ _ => Policy.WrapAsync(policies), // more than one needs to be combined
+ };
+ }
+}
diff --git a/src/Tingle.EventBus/Transports/EventBusTransport.cs b/src/Tingle.EventBus/Transports/EventBusTransport.cs
index f7572592..17204f1c 100644
--- a/src/Tingle.EventBus/Transports/EventBusTransport.cs
+++ b/src/Tingle.EventBus/Transports/EventBusTransport.cs
@@ -80,9 +80,8 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
- // Set publish retry policy
- reg.RetryPolicy ??= Options.DefaultRetryPolicy;
- reg.RetryPolicy ??= BusOptions.DefaultRetryPolicy;
+ // Combine the retry policies
+ PollyHelper.Combine(BusOptions, Options, reg);
foreach (var ecr in reg.Consumers)
{
@@ -111,16 +110,9 @@ public virtual Task StopAsync(CancellationToken cancellationToken)
CancellationToken cancellationToken = default)
where TEvent : class
{
- // publish, with retry if specified
- var retryPolicy = registration.RetryPolicy;
- if (retryPolicy != null)
- {
- return await retryPolicy.ExecuteAsync(ct => PublishCoreAsync(@event, registration, scheduled, ct), cancellationToken).ConfigureAwait(false);
- }
- else
- {
- return await PublishCoreAsync(@event, registration, scheduled, cancellationToken).ConfigureAwait(false);
- }
+ // publish, with resilience policies
+ return await registration.ExecutionPolicy.ExecuteAsync(
+ ct => PublishCoreAsync(@event, registration, scheduled, ct), cancellationToken).ConfigureAwait(false);
}
///
@@ -130,16 +122,9 @@ public virtual Task StopAsync(CancellationToken cancellationToken)
CancellationToken cancellationToken = default)
where TEvent : class
{
- // publish, with retry if specified
- var retryPolicy = registration.RetryPolicy;
- if (retryPolicy != null)
- {
- return await retryPolicy.ExecuteAsync(ct => PublishCoreAsync(events, registration, scheduled, ct), cancellationToken).ConfigureAwait(false);
- }
- else
- {
- return await PublishCoreAsync(events, registration, scheduled, cancellationToken).ConfigureAwait(false);
- }
+ // publish, with resilience policies
+ return await registration.ExecutionPolicy.ExecuteAsync(
+ ct => PublishCoreAsync(events, registration, scheduled, ct), cancellationToken).ConfigureAwait(false);
}
///
@@ -164,32 +149,18 @@ public virtual Task StopAsync(CancellationToken cancellationToken)
public virtual async Task CancelAsync(string id, EventRegistration registration, CancellationToken cancellationToken = default)
where TEvent : class
{
- // cancel, with retry if specified
- var retryPolicy = registration.RetryPolicy;
- if (retryPolicy != null)
- {
- await retryPolicy.ExecuteAsync(ct => CancelCoreAsync(id, registration, ct), cancellationToken).ConfigureAwait(false);
- }
- else
- {
- await CancelCoreAsync(id, registration, cancellationToken).ConfigureAwait(false);
- }
+ // cancel, with resilience policies
+ await registration.ExecutionPolicy.ExecuteAsync(
+ ct => CancelCoreAsync(id, registration, ct), cancellationToken).ConfigureAwait(false);
}
///
public virtual async Task CancelAsync(IList ids, EventRegistration registration, CancellationToken cancellationToken = default)
where TEvent : class
{
- // cancel, with retry if specified
- var retryPolicy = registration.RetryPolicy;
- if (retryPolicy != null)
- {
- await retryPolicy.ExecuteAsync(ct => CancelCoreAsync(ids, registration, ct), cancellationToken).ConfigureAwait(false);
- }
- else
- {
- await CancelCoreAsync(ids, registration, cancellationToken).ConfigureAwait(false);
- }
+ // cancel, with resilience policies
+ await registration.ExecutionPolicy.ExecuteAsync(
+ ct => CancelCoreAsync(ids, registration, ct), cancellationToken).ConfigureAwait(false);
}
///
@@ -332,16 +303,9 @@ protected async Task ConsumeAsync(EventRe
// Resolve the consumer
var consumer = ActivatorUtilities.GetServiceOrCreateInstance(scope.ServiceProvider);
- // Invoke handler method, with retry if specified
- var retryPolicy = registration.RetryPolicy;
- if (retryPolicy != null)
- {
- await retryPolicy.ExecuteAsync(ct => consumer.ConsumeAsync(@event, ct), cancellationToken).ConfigureAwait(false);
- }
- else
- {
- await consumer.ConsumeAsync(@event, cancellationToken).ConfigureAwait(false);
- }
+ // Invoke handler method, with resilience policies
+ await registration.ExecutionPolicy.ExecuteAsync(
+ ct => consumer.ConsumeAsync(@event, ct), cancellationToken).ConfigureAwait(false);
return new EventConsumeResult(successful: true, exception: null);
}
diff --git a/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs b/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs
index 15e64301..d224a52c 100644
--- a/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs
+++ b/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs
@@ -9,6 +9,18 @@ namespace Tingle.EventBus.Transports;
///
public abstract class EventBusTransportOptions
{
+ ///
+ /// Optional retry policy to apply specifically for this transport.
+ /// This is in addition to what may be provided by the transport SDKs.
+ /// When provided alongside policies on the bus and the event registration,
+ /// it is configured inner to the one on the bus and outer to the one on the event registration.
+ ///
+ ///
+ /// To specify a value on an event registration, use .
+ /// To specify a value on the bus, use .
+ ///
+ public AsyncRetryPolicy? RetryPolicy { get; set; }
+
///
/// The delay to introduce every time zero messages are received.
/// This eases on the CPU consumption and reduces the query costs.
@@ -48,13 +60,6 @@ public abstract class EventBusTransportOptions
///
public EventIdFormat? DefaultEventIdFormat { get; set; }
- ///
- /// Optional default retry policy to use where it is not specified.
- /// This value overrides the default value set on the bus via .
- /// To specify a value per consumer, use the option.
- ///
- public AsyncRetryPolicy? DefaultRetryPolicy { get; set; }
-
///
/// Optional default behaviour for errors encountered in a consumer but are not handled.
/// This value overrides the default value set on the bus via .