Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap retry policies #455

Merged
merged 4 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/Tingle.EventBus/Configuration/EventRegistration.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Polly.Retry;
using Polly;
using Polly.Retry;
using Tingle.EventBus.Serialization;

namespace Tingle.EventBus.Configuration;
Expand Down Expand Up @@ -51,10 +52,9 @@ public EventRegistration(Type eventType)
public Type? EventSerializerType { get; set; }

/// <summary>
/// The retry policy to apply when in addition to what may be provided by the SDKs for each transport.
/// When set to <see langword="null"/>, no additional retry policy is applied.
/// Defaults to <see langword="null"/>.
/// When this value is set, it overrides the default value set on the transport or the bus.
/// The retry policy to apply specifically for this event.
/// This is in addition to what may be provided by the SDKs for each transport.
/// When provided alongside policies on the transport and the bus, it is used as the inner most policy.
/// </summary>
/// <remarks>
/// When a value is provided, the transport may extend the lock for the
Expand All @@ -78,6 +78,12 @@ public EventRegistration(Type eventType)
/// </summary>
public IDictionary<string, object> Metadata { get; set; } = new Dictionary<string, object>();

/// <summary>Whether the execution policies have been merged.</summary>
internal bool MergedExecutionPolicies { get; set; } = false;

/// <summary>The final policy used in executions for the event and it's consumers.</summary>
internal IAsyncPolicy ExecutionPolicy { get; set; } = Policy.NoOpAsync();

#region Equality Overrides

/// <inheritdoc/>
Expand Down
19 changes: 11 additions & 8 deletions src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ public class EventBusOptions
/// </summary>
public EventBusNamingOptions Naming { get; } = new EventBusNamingOptions();

/// <summary>
/// Optional retry policy to apply to the bus.
/// When provided alongside policies on the transport and the event registration, it is used as the putter most policy.
/// Defaults to <see langword="null"/>.
/// </summary>
/// <remarks>
/// To specify a value on an event registration, use <see cref="EventRegistration.RetryPolicy"/>.
/// To specify a value on a transport, use <see cref="EventBusTransportOptions.RetryPolicy"/> for the specific transport.
/// </remarks>
public AsyncRetryPolicy? RetryPolicy { get; set; }

/// <summary>
/// Indicates if the messages/events produced require guard against duplicate messages.
/// If <see langword="true"/>, duplicate messages having the same <see cref="EventContext.Id"/>
Expand Down Expand Up @@ -53,14 +64,6 @@ public class EventBusOptions
/// </summary>
public EventIdFormat DefaultEventIdFormat { get; set; } = EventIdFormat.Guid;

/// <summary>
/// Optional default retry policy to use where it is not specified.
/// To specify a value per consumer, use the <see cref="EventRegistration.RetryPolicy"/> option.
/// To specify a value per transport, use the <see cref="EventBusTransportOptions.DefaultRetryPolicy"/> option on the specific transport.
/// Defaults to <see langword="null"/>.
/// </summary>
public AsyncRetryPolicy? DefaultRetryPolicy { get; set; }

/// <summary>
/// Optional default behaviour for errors encountered in a consumer but are not handled.
/// To specify a value per consumer, use the <see cref="EventConsumerRegistration.UnhandledErrorBehaviour"/> option.
Expand Down
38 changes: 38 additions & 0 deletions src/Tingle.EventBus/PollyHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using Microsoft.Extensions.DependencyInjection;
using Polly;
using Tingle.EventBus.Configuration;
using Tingle.EventBus.Transports;

namespace Tingle.EventBus;

internal static class PollyHelper
{
public static void Combine(EventBusOptions busOptions, EventBusTransportOptions transportOptions, EventRegistration registration)
{
if (busOptions is null) throw new ArgumentNullException(nameof(busOptions));
if (transportOptions is null) throw new ArgumentNullException(nameof(transportOptions));
if (registration is null) throw new ArgumentNullException(nameof(registration));

// if the policies have been merged, there is no need to repeat the process
if (registration.MergedExecutionPolicies) return;

registration.ExecutionPolicy = CombineInternal(busOptions, transportOptions, registration);
registration.MergedExecutionPolicies = true;
}

private static IAsyncPolicy CombineInternal(EventBusOptions busOptions, EventBusTransportOptions transportOptions, EventRegistration registration)
{
var policies = new IAsyncPolicy?[] {
busOptions.RetryPolicy, // outer
transportOptions.RetryPolicy,
registration.RetryPolicy, // inner
}.Where(p => p is not null).Select(p => p!).ToArray();

return policies.Length switch
{
0 => Policy.NoOpAsync(), // if there are none, return No-Op, if
1 => policies[0], // a single policy can just be used (no need to combine)
_ => Policy.WrapAsync(policies), // more than one needs to be combined
};
}
}
70 changes: 17 additions & 53 deletions src/Tingle.EventBus/Transports/EventBusTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
// Set publish retry policy
reg.RetryPolicy ??= Options.DefaultRetryPolicy;
reg.RetryPolicy ??= BusOptions.DefaultRetryPolicy;
// Combine the retry policies
PollyHelper.Combine(BusOptions, Options, reg);

foreach (var ecr in reg.Consumers)
{
Expand Down Expand Up @@ -111,16 +110,9 @@ public virtual Task StopAsync(CancellationToken cancellationToken)
CancellationToken cancellationToken = default)
where TEvent : class
{
// publish, with retry if specified
var retryPolicy = registration.RetryPolicy;
if (retryPolicy != null)
{
return await retryPolicy.ExecuteAsync(ct => PublishCoreAsync(@event, registration, scheduled, ct), cancellationToken).ConfigureAwait(false);
}
else
{
return await PublishCoreAsync(@event, registration, scheduled, cancellationToken).ConfigureAwait(false);
}
// publish, with resilience policies
return await registration.ExecutionPolicy.ExecuteAsync(
ct => PublishCoreAsync(@event, registration, scheduled, ct), cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand All @@ -130,16 +122,9 @@ public virtual Task StopAsync(CancellationToken cancellationToken)
CancellationToken cancellationToken = default)
where TEvent : class
{
// publish, with retry if specified
var retryPolicy = registration.RetryPolicy;
if (retryPolicy != null)
{
return await retryPolicy.ExecuteAsync(ct => PublishCoreAsync(events, registration, scheduled, ct), cancellationToken).ConfigureAwait(false);
}
else
{
return await PublishCoreAsync(events, registration, scheduled, cancellationToken).ConfigureAwait(false);
}
// publish, with resilience policies
return await registration.ExecutionPolicy.ExecuteAsync(
ct => PublishCoreAsync(events, registration, scheduled, ct), cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand All @@ -164,32 +149,18 @@ public virtual Task StopAsync(CancellationToken cancellationToken)
public virtual async Task CancelAsync<TEvent>(string id, EventRegistration registration, CancellationToken cancellationToken = default)
where TEvent : class
{
// cancel, with retry if specified
var retryPolicy = registration.RetryPolicy;
if (retryPolicy != null)
{
await retryPolicy.ExecuteAsync(ct => CancelCoreAsync<TEvent>(id, registration, ct), cancellationToken).ConfigureAwait(false);
}
else
{
await CancelCoreAsync<TEvent>(id, registration, cancellationToken).ConfigureAwait(false);
}
// cancel, with resilience policies
await registration.ExecutionPolicy.ExecuteAsync(
ct => CancelCoreAsync<TEvent>(id, registration, ct), cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public virtual async Task CancelAsync<TEvent>(IList<string> ids, EventRegistration registration, CancellationToken cancellationToken = default)
where TEvent : class
{
// cancel, with retry if specified
var retryPolicy = registration.RetryPolicy;
if (retryPolicy != null)
{
await retryPolicy.ExecuteAsync(ct => CancelCoreAsync<TEvent>(ids, registration, ct), cancellationToken).ConfigureAwait(false);
}
else
{
await CancelCoreAsync<TEvent>(ids, registration, cancellationToken).ConfigureAwait(false);
}
// cancel, with resilience policies
await registration.ExecutionPolicy.ExecuteAsync(
ct => CancelCoreAsync<TEvent>(ids, registration, ct), cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -332,16 +303,9 @@ protected async Task<EventConsumeResult> ConsumeAsync<TEvent, TConsumer>(EventRe
// Resolve the consumer
var consumer = ActivatorUtilities.GetServiceOrCreateInstance<TConsumer>(scope.ServiceProvider);

// Invoke handler method, with retry if specified
var retryPolicy = registration.RetryPolicy;
if (retryPolicy != null)
{
await retryPolicy.ExecuteAsync(ct => consumer.ConsumeAsync(@event, ct), cancellationToken).ConfigureAwait(false);
}
else
{
await consumer.ConsumeAsync(@event, cancellationToken).ConfigureAwait(false);
}
// Invoke handler method, with resilience policies
await registration.ExecutionPolicy.ExecuteAsync(
ct => consumer.ConsumeAsync(@event, ct), cancellationToken).ConfigureAwait(false);

return new EventConsumeResult(successful: true, exception: null);
}
Expand Down
19 changes: 12 additions & 7 deletions src/Tingle.EventBus/Transports/EventBusTransportOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ namespace Tingle.EventBus.Transports;
/// </summary>
public abstract class EventBusTransportOptions
{
/// <summary>
/// Optional retry policy to apply specifically for this transport.
/// This is in addition to what may be provided by the transport SDKs.
/// When provided alongside policies on the bus and the event registration,
/// it is configured inner to the one on the bus and outer to the one on the event registration.
/// </summary>
/// <remarks>
/// To specify a value on an event registration, use <see cref="EventRegistration.RetryPolicy"/>.
/// To specify a value on the bus, use <see cref="EventBusOptions.RetryPolicy"/>.
/// </remarks>
public AsyncRetryPolicy? RetryPolicy { get; set; }

/// <summary>
/// The delay to introduce every time zero messages are received.
/// This eases on the CPU consumption and reduces the query costs.
Expand Down Expand Up @@ -48,13 +60,6 @@ public abstract class EventBusTransportOptions
/// </summary>
public EventIdFormat? DefaultEventIdFormat { get; set; }

/// <summary>
/// Optional default retry policy to use where it is not specified.
/// This value overrides the default value set on the bus via <see cref="EventBusOptions.DefaultRetryPolicy"/>.
/// To specify a value per consumer, use the <see cref="EventRegistration.RetryPolicy"/> option.
/// </summary>
public AsyncRetryPolicy? DefaultRetryPolicy { get; set; }

/// <summary>
/// Optional default behaviour for errors encountered in a consumer but are not handled.
/// This value overrides the default value set on the bus via <see cref="EventBusOptions.DefaultUnhandledConsumerErrorBehaviour"/>.
Expand Down