Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clenaup rate limiter API #1509

Merged
merged 2 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Polly.RateLimiting/DisposeWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System.Threading.RateLimiting;

namespace Polly.RateLimiting;

internal sealed class DisposeWrapper : IDisposable, IAsyncDisposable
{
internal DisposeWrapper(RateLimiter limiter) => Limiter = limiter;

public RateLimiter Limiter { get; }

public ValueTask DisposeAsync() => Limiter.DisposeAsync();

public void Dispose() => Limiter.Dispose();
}
11 changes: 5 additions & 6 deletions src/Polly.RateLimiting/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ Polly.RateLimiting.OnRateLimiterRejectedArguments.Context.get -> Polly.Resilienc
Polly.RateLimiting.OnRateLimiterRejectedArguments.Lease.get -> System.Threading.RateLimiting.RateLimitLease!
Polly.RateLimiting.OnRateLimiterRejectedArguments.OnRateLimiterRejectedArguments() -> void
Polly.RateLimiting.OnRateLimiterRejectedArguments.OnRateLimiterRejectedArguments(Polly.ResilienceContext! context, System.Threading.RateLimiting.RateLimitLease! lease) -> void
Polly.RateLimiting.RateLimiterArguments
Polly.RateLimiting.RateLimiterArguments.Context.get -> Polly.ResilienceContext!
Polly.RateLimiting.RateLimiterArguments.RateLimiterArguments() -> void
Polly.RateLimiting.RateLimiterArguments.RateLimiterArguments(Polly.ResilienceContext! context) -> void
Polly.RateLimiting.RateLimiterRejectedException
Polly.RateLimiting.RateLimiterRejectedException.RateLimiterRejectedException() -> void
Polly.RateLimiting.RateLimiterRejectedException.RateLimiterRejectedException(string! message) -> void
Expand All @@ -18,15 +22,10 @@ Polly.RateLimiting.RateLimiterStrategyOptions.DefaultRateLimiterOptions.get -> S
Polly.RateLimiting.RateLimiterStrategyOptions.DefaultRateLimiterOptions.set -> void
Polly.RateLimiting.RateLimiterStrategyOptions.OnRejected.get -> System.Func<Polly.RateLimiting.OnRateLimiterRejectedArguments, System.Threading.Tasks.ValueTask>?
Polly.RateLimiting.RateLimiterStrategyOptions.OnRejected.set -> void
Polly.RateLimiting.RateLimiterStrategyOptions.RateLimiter.get -> Polly.RateLimiting.ResilienceRateLimiter?
Polly.RateLimiting.RateLimiterStrategyOptions.RateLimiter.get -> System.Func<Polly.RateLimiting.RateLimiterArguments, System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease!>>?
Polly.RateLimiting.RateLimiterStrategyOptions.RateLimiter.set -> void
Polly.RateLimiting.RateLimiterStrategyOptions.RateLimiterStrategyOptions() -> void
Polly.RateLimiting.ResilienceRateLimiter
static Polly.RateLimiterResiliencePipelineBuilderExtensions.AddConcurrencyLimiter<TBuilder>(this TBuilder! builder, int permitLimit, int queueLimit = 0) -> TBuilder!
static Polly.RateLimiterResiliencePipelineBuilderExtensions.AddConcurrencyLimiter<TBuilder>(this TBuilder! builder, System.Threading.RateLimiting.ConcurrencyLimiterOptions! options) -> TBuilder!
static Polly.RateLimiterResiliencePipelineBuilderExtensions.AddRateLimiter<TBuilder>(this TBuilder! builder, Polly.RateLimiting.RateLimiterStrategyOptions! options) -> TBuilder!
static Polly.RateLimiterResiliencePipelineBuilderExtensions.AddRateLimiter<TBuilder>(this TBuilder! builder, System.Threading.RateLimiting.RateLimiter! limiter) -> TBuilder!
Polly.RateLimiting.ResilienceRateLimiter.Dispose() -> void
Polly.RateLimiting.ResilienceRateLimiter.DisposeAsync() -> System.Threading.Tasks.ValueTask
static Polly.RateLimiting.ResilienceRateLimiter.Create(System.Threading.RateLimiting.PartitionedRateLimiter<Polly.ResilienceContext!>! rateLimiter) -> Polly.RateLimiting.ResilienceRateLimiter!
static Polly.RateLimiting.ResilienceRateLimiter.Create(System.Threading.RateLimiting.RateLimiter! rateLimiter) -> Polly.RateLimiting.ResilienceRateLimiter!
20 changes: 20 additions & 0 deletions src/Polly.RateLimiting/RateLimiterArguments.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Polly.RateLimiting;

#pragma warning disable CA1815 // Override equals and operator equals on value types

/// <summary>
/// The arguments used by the <see cref="RateLimiterStrategyOptions.RateLimiter"/> delegate.
/// </summary>
public readonly struct RateLimiterArguments
{
/// <summary>
/// Initializes a new instance of the <see cref="RateLimiterArguments"/> struct.
/// </summary>
/// <param name="context">Context associated with the execution of a user-provided callback.</param>
public RateLimiterArguments(ResilienceContext context) => Context = context;

/// <summary>
/// Gets the context associated with the execution of a user-provided callback.
/// </summary>
public ResilienceContext Context { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static TBuilder AddRateLimiter<TBuilder>(

return builder.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = ResilienceRateLimiter.Create(limiter),
RateLimiter = args => limiter.AcquireAsync(1, args.Context.CancellationToken),
});
}

Expand Down Expand Up @@ -109,10 +109,20 @@ public static TBuilder AddRateLimiter<TBuilder>(
return builder.AddStrategy(
context =>
{
DisposeWrapper? wrapper = default;
var limiter = options.RateLimiter;
if (limiter is null)
{
var defaultLimiter = new ConcurrencyLimiter(options.DefaultRateLimiterOptions);
wrapper = new DisposeWrapper(defaultLimiter);
limiter = args => defaultLimiter.AcquireAsync(1, args.Context.CancellationToken);
}

return new RateLimiterResilienceStrategy(
options.RateLimiter ?? ResilienceRateLimiter.Create(new ConcurrencyLimiter(options.DefaultRateLimiterOptions)),
limiter,
options.OnRejected,
context.Telemetry);
context.Telemetry,
wrapper);
},
options);
}
Expand Down
24 changes: 18 additions & 6 deletions src/Polly.RateLimiting/RateLimiterResilienceStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,42 @@ internal sealed class RateLimiterResilienceStrategy : ResilienceStrategy, IDispo
private readonly ResilienceStrategyTelemetry _telemetry;

public RateLimiterResilienceStrategy(
ResilienceRateLimiter limiter,
Func<RateLimiterArguments, ValueTask<RateLimitLease>> limiter,
Func<OnRateLimiterRejectedArguments, ValueTask>? onRejected,
ResilienceStrategyTelemetry telemetry)
ResilienceStrategyTelemetry telemetry,
DisposeWrapper? wrapper)
{
Limiter = limiter;
OnLeaseRejected = onRejected;

_telemetry = telemetry;
Wrapper = wrapper;
}

public ResilienceRateLimiter Limiter { get; }
public Func<RateLimiterArguments, ValueTask<RateLimitLease>> Limiter { get; }

public Func<OnRateLimiterRejectedArguments, ValueTask>? OnLeaseRejected { get; }

public void Dispose() => Limiter.Dispose();
public DisposeWrapper? Wrapper { get; }

public ValueTask DisposeAsync() => Limiter.DisposeAsync();
public void Dispose() => Wrapper?.Dispose();

public ValueTask DisposeAsync()
{
if (Wrapper is not null)
{
return Wrapper.DisposeAsync();
}

return default;
}

protected override async ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state)
{
using var lease = await Limiter.AcquireAsync(context).ConfigureAwait(context.ContinueOnCapturedContext);
using var lease = await Limiter(new RateLimiterArguments(context)).ConfigureAwait(context.ContinueOnCapturedContext);

if (lease.IsAcquired)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Polly.RateLimiting/RateLimiterStrategyOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public class RateLimiterStrategyOptions : ResilienceStrategyOptions
public Func<OnRateLimiterRejectedArguments, ValueTask>? OnRejected { get; set; }

/// <summary>
/// Gets or sets a rate limiter used by the strategy.
/// Gets or sets a rate limiter delegate that produces <see cref="RateLimitLease"/>.
/// </summary>
/// <value>
/// The default value is <see langword="null"/>. If this property is <see langword="null"/>, then the strategy
/// will use a <see cref="ConcurrencyLimiter"/> created using <see cref="DefaultRateLimiterOptions"/>.
/// </value>
public ResilienceRateLimiter? RateLimiter { get; set; }
public Func<RateLimiterArguments, ValueTask<RateLimitLease>>? RateLimiter { get; set; }
martincostello marked this conversation as resolved.
Show resolved Hide resolved
}
71 changes: 0 additions & 71 deletions src/Polly.RateLimiting/ResilienceRateLimiter.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async void PartitionedRateLimiter_EnsureUserLimited_1365()

builder.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = ResilienceRateLimiter.Create(partitionedLimiter)
RateLimiter = args => partitionedLimiter.AcquireAsync(args.Context, 1, args.Context.CancellationToken)
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class RateLimiterResiliencePipelineBuilderExtensionsTests
builder =>
{
builder.AddConcurrencyLimiter(2, 2);
AssertRateLimiterStrategy(builder, strategy => strategy.Limiter.Limiter.Should().BeOfType<ConcurrencyLimiter>());
AssertRateLimiterStrategy(builder, strategy => strategy.Wrapper!.Limiter.Should().BeOfType<ConcurrencyLimiter>());
},
builder =>
{
Expand All @@ -25,13 +25,20 @@ public class RateLimiterResiliencePipelineBuilderExtensionsTests
QueueLimit = 2
});

AssertRateLimiterStrategy(builder, strategy => strategy.Limiter.Limiter.Should().BeOfType<ConcurrencyLimiter>());
AssertRateLimiterStrategy(builder, strategy => strategy.Wrapper!.Limiter.Should().BeOfType<ConcurrencyLimiter>());
},
builder =>
{
var expected = Substitute.For<RateLimiter>();
builder.AddRateLimiter(expected);
AssertRateLimiterStrategy(builder, strategy => strategy.Limiter.Limiter.Should().Be(expected));
AssertRateLimiterStrategy(builder, strategy => strategy.Wrapper.Should().BeNull());
},
builder =>
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions { PermitLimit = 1 });
builder.AddRateLimiter(limiter);
builder.Build().Execute(() => { });
AssertRateLimiterStrategy(builder, strategy => strategy.Wrapper.Should().BeNull());
}
};

Expand Down Expand Up @@ -85,7 +92,7 @@ public void AddRateLimiter_Ok()
new ResiliencePipelineBuilder()
.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = ResilienceRateLimiter.Create(limiter)
RateLimiter = args => limiter.AcquireAsync(1, args.Context.CancellationToken)
})
.Build()
.GetPipelineDescriptor()
Expand Down Expand Up @@ -129,7 +136,7 @@ public void AddRateLimiter_Options_Ok()
var strategy = new ResiliencePipelineBuilder()
.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = ResilienceRateLimiter.Create(Substitute.For<RateLimiter>())
RateLimiter = args => new ValueTask<RateLimitLease>(Substitute.For<RateLimitLease>())
})
.Build()
.GetPipelineDescriptor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,44 @@ public async Task Execute_LeaseRejected(bool hasEvents, bool hasRetryAfter)
_listener.GetArgs<OnRateLimiterRejectedArguments>().Should().HaveCount(1);
}

[InlineData(true)]
[InlineData(false)]
[Theory]
public async Task Dispose_DisposableResourcesShouldBeDisposed(bool isAsync)
{
using var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions { PermitLimit = 1 });
using var wrapper = new DisposeWrapper(limiter);
var strategy = new RateLimiterResilienceStrategy(null!, null, null!, wrapper);

if (isAsync)
{
await strategy.DisposeAsync();
}
else
{
strategy.Dispose();
}

await limiter.Invoking(l => l.AcquireAsync(1).AsTask()).Should().ThrowAsync<ObjectDisposedException>();
}

[InlineData(true)]
[InlineData(false)]
[Theory]
public async Task Dispose_NoDisposableResources_ShouldNotThrow(bool isAsync)
{
using var strategy = new RateLimiterResilienceStrategy(null!, null, null!, null);

if (isAsync)
{
await strategy.Invoking(s => s.DisposeAsync().AsTask()).Should().NotThrowAsync();
}
else
{
strategy.Invoking(s => s.Dispose()).Should().NotThrow();
}
}

private void SetupLimiter(CancellationToken token)
{
var result = new ValueTask<RateLimitLease>(_lease);
Expand All @@ -107,7 +145,7 @@ private ResiliencePipeline Create()

return builder.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = ResilienceRateLimiter.Create(_limiter),
RateLimiter = args => _limiter.AcquireAsync(1, args.Context.CancellationToken),
OnRejected = _event
})
.Build();
Expand Down
Loading