Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Ability to override StartDurationThreshold to change the duration… #5

Merged
merged 1 commit into from
Jul 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,16 @@ endpointConfiguration.ApplyRateLimiting(limitPerSecond:10, burstSize:250);
```

Using this logic to cope with sudden peaks of incoming messages but still maintain an average rate of messages per second.

### Example: Disable duration log entry message

By default, is processing does not start within 5 seconds a message is written to the log. This can be disabled.

```c#
endpointConfiguration.ApplyRateLimiting(cfg=>{
cfg.Duration = TimeSpan.FromSeconds(1),
cfg.Limit = 10,
cfg.StartDurationThreshold = Timeout.InfiniteTimeSpan; // or TimeSpan.FromMilliseconds(-1);
cfg.
});
```
34 changes: 23 additions & 11 deletions src/NServiceBus.RateLimiter/Pipeline/RateLimitBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,37 @@
sealed class RateLimitBehavior : IBehavior<ITransportReceiveContext, ITransportReceiveContext>
{
readonly ILog Log = LogManager.GetLogger("RateLimiter");
readonly TimeSpan WarningThresshold = TimeSpan.FromSeconds(5);
readonly bool IsInfoEnabled;
readonly TimeSpan StartDurationThreshold;
readonly RateGate Gate;

public RateLimitBehavior(int occurences, TimeSpan duration)
public RateLimitBehavior(int occurrences, TimeSpan duration, TimeSpan warningThreshold)
{
Gate = new RateGate(occurences, duration);
Gate = new RateGate(occurrences, duration);
StartDurationThreshold = warningThreshold;
IsInfoEnabled = Log.IsInfoEnabled;
}

public async Task Invoke(ITransportReceiveContext context, Func<ITransportReceiveContext, Task> next)
{
var start = Stopwatch.StartNew();
await Gate.WaitAsync().ConfigureAwait(false);
var duration = start.Elapsed;
if (duration > WarningThresshold)
if (IsInfoEnabled && StartDurationThreshold > TimeSpan.Zero)
{
Log.InfoFormat("Message '{0}' delayed by {1:g} due to throttling. This can conflict with message lease times or transaction timeouts. Consider lowering the burst size or to shorten the rate limiting duration.",
context.Message.MessageId,
duration
);
var start = Stopwatch.StartNew();
await Gate.WaitAsync().ConfigureAwait(false);
var duration = start.Elapsed;
if (duration > StartDurationThreshold)
{
Log.InfoFormat(
"Message '{0}' delayed due to throttling by {1:g} which is more than the configured `WarningThresshold` value {2:g}. This can conflict with message lease times or transaction timeouts. Consider lowering the burst size or to shorten the rate limiting duration. ",
context.Message.MessageId,
duration,
StartDurationThreshold
);
}
}
else
{
await Gate.WaitAsync().ConfigureAwait(false);
}
await next(context).ConfigureAwait(false);
}
Expand Down
7 changes: 0 additions & 7 deletions src/NServiceBus.RateLimiter/Properties.cs

This file was deleted.

20 changes: 20 additions & 0 deletions src/NServiceBus.RateLimiter/RateLimiterConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;

/// <summary>
/// Rate limiter configuration
/// </summary>
public sealed class RateLimiterConfiguration
{
/// <summary>
/// The duration after which the configured rate Limit is reset.
/// </summary>
public TimeSpan Duration { get; set; }
/// <summary>
/// The maximum number of message to process within the configured Duration.
/// </summary>
public int Limit { get; set; }
/// <summary>
/// The duration warning threshold after which a Informational log entry if logged to raise awareness of an imbalance between configured rate limit and concurrency. Set to -1 milliseconds to disable.
/// </summary>
public TimeSpan StartDurationThreshold { get; set; } = TimeSpan.FromSeconds(5);
}
38 changes: 28 additions & 10 deletions src/NServiceBus.RateLimiter/RateLimiterConfigurationExtension.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Logging;

namespace NServiceBus
{
Expand All @@ -26,15 +25,15 @@ public static class RateLimiterConfigurationExtension
public static void ApplyRateLimiting(this EndpointConfiguration instance, int limitPerSecond, int burstSize)
{
if (limitPerSecond < 1) throw new ArgumentOutOfRangeException(nameof(limitPerSecond), limitPerSecond, "Must be larger than 0");
var properties = new global::Properties
var properties = new global::RateLimiterConfiguration
{
Duration = TimeSpan.FromSeconds(1.0 / limitPerSecond * burstSize),
Limit = burstSize,
};
var minimumBurstSize = limitPerSecond / 10D;
if (burstSize < minimumBurstSize) throw new ArgumentOutOfRangeException(nameof(burstSize), burstSize, $"Must be larger than {Math.Ceiling(minimumBurstSize):N0} when {nameof(limitPerSecond)}={limitPerSecond}.");
if (burstSize < minimumBurstSize) throw new ArgumentOutOfRangeException(nameof(burstSize), burstSize, $"Must be larger than {Math.Ceiling(minimumBurstSize):N0} when {nameof(limitPerSecond)}={limitPerSecond}");
var settings = instance.GetSettings();
settings.Set<global::Properties>(properties);
settings.Set<global::RateLimiterConfiguration>(properties);
instance.EnableFeature<RateLimiterFeature>();
}

Expand All @@ -49,14 +48,14 @@ public static void ApplyRateLimiting(this EndpointConfiguration instance, int li
public static void ApplyRateLimiting(this EndpointConfiguration instance, int limit, TimeSpan duration)
{
if (limit < 1) throw new ArgumentOutOfRangeException(nameof(limit), limit, "Must be larger than 0");
if (duration < MinimumDuration) throw new ArgumentOutOfRangeException(nameof(duration), duration, $"Must be larger than {MinimumDuration.TotalMilliseconds}ms.");
var properties = new global::Properties
if (duration < MinimumDuration) throw new ArgumentOutOfRangeException(nameof(duration), duration, $"Must be larger than {MinimumDuration.TotalMilliseconds}ms");
var properties = new global::RateLimiterConfiguration
{
Duration = duration,
Limit = limit,
};
var settings = instance.GetSettings();
settings.Set<global::Properties>(properties);
settings.Set<global::RateLimiterConfiguration>(properties);
instance.EnableFeature<RateLimiterFeature>();
}

Expand All @@ -71,14 +70,33 @@ public static void ApplyRateLimiting(this EndpointConfiguration instance, int li
public static void ApplyRateLimiting(this EndpointConfiguration instance, int limitPerSecond)
{
if (limitPerSecond < 1) throw new ArgumentOutOfRangeException(nameof(limitPerSecond), limitPerSecond, "Must be larger than 0");
var properties = new global::Properties
var properties = new global::RateLimiterConfiguration
{
Duration = TimeSpan.FromSeconds(1),
Limit = limitPerSecond,
};
var settings = instance.GetSettings();
settings.Set<global::Properties>(properties);
settings.Set<global::RateLimiterConfiguration>(properties);
instance.EnableFeature<RateLimiterFeature>();
}

/// <summary>
/// Enable rate limiting.
/// </summary>
public static void ApplyRateLimiting(this EndpointConfiguration instance, Action<RateLimiterConfiguration> config)
{
if (config == null) throw new ArgumentNullException(nameof(config));

var properties = new RateLimiterConfiguration();
config(properties);

if (properties.Limit < 1) throw new ArgumentOutOfRangeException(nameof(RateLimiterConfiguration.Limit), properties.Limit, "Must be larger than 0");
if (properties.Duration < MinimumDuration) throw new ArgumentOutOfRangeException(nameof(RateLimiterConfiguration.Duration), properties.Duration, $"Must be larger than {MinimumDuration.TotalMilliseconds}ms");
if (properties.StartDurationThreshold == TimeSpan.Zero || properties.StartDurationThreshold < System.Threading.Timeout.InfiniteTimeSpan) throw new ArgumentOutOfRangeException(nameof(RateLimiterConfiguration.StartDurationThreshold), properties.StartDurationThreshold, $"Must be larger than 0 or -1ms to disable");

var settings = instance.GetSettings();
settings.Set<global::RateLimiterConfiguration>(properties);
instance.EnableFeature<RateLimiterFeature>();
}
}
}
}
4 changes: 2 additions & 2 deletions src/NServiceBus.RateLimiter/RateLimiterFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ sealed class RateLimiterFeature : Feature
{
protected override void Setup(FeatureConfigurationContext context)
{
if (!context.Settings.TryGet<Properties>(out var properties)) return;
if (!context.Settings.TryGet<RateLimiterConfiguration>(out var properties)) return;
LogManager.GetLogger("RateLimiter").InfoFormat("Rate limiter configuration: Limit:{0:N0} Duration:{1:g}", properties.Limit, properties.Duration);
context.Pipeline.Register(behavior: new RateLimitBehavior(properties.Limit, properties.Duration), description: nameof(RateLimitBehavior));
context.Pipeline.Register(behavior: new RateLimitBehavior(properties.Limit, properties.Duration, properties.StartDurationThreshold), description: nameof(RateLimitBehavior));
}
}