Skip to content

Commit

Permalink
✨ Ability to override StartDurationThreshold to change the duration… (
Browse files Browse the repository at this point in the history
  • Loading branch information
ramonsmits authored Jul 16, 2023
1 parent ee5cbdf commit 6794614
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 30 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,16 @@ endpointConfiguration.ApplyRateLimiting(limitPerSecond:10, burstSize:250);
```

Using this logic to cope with sudden peaks of incoming messages but still maintain an average rate of messages per second.

### Example: Disable duration log entry message

By default, is processing does not start within 5 seconds a message is written to the log. This can be disabled.

```c#
endpointConfiguration.ApplyRateLimiting(cfg=>{
cfg.Duration = TimeSpan.FromSeconds(1),
cfg.Limit = 10,
cfg.StartDurationThreshold = Timeout.InfiniteTimeSpan; // or TimeSpan.FromMilliseconds(-1);
cfg.
});
```
34 changes: 23 additions & 11 deletions src/NServiceBus.RateLimiter/Pipeline/RateLimitBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,37 @@
sealed class RateLimitBehavior : IBehavior<ITransportReceiveContext, ITransportReceiveContext>
{
readonly ILog Log = LogManager.GetLogger("RateLimiter");
readonly TimeSpan WarningThresshold = TimeSpan.FromSeconds(5);
readonly bool IsInfoEnabled;
readonly TimeSpan StartDurationThreshold;
readonly RateGate Gate;

public RateLimitBehavior(int occurences, TimeSpan duration)
public RateLimitBehavior(int occurrences, TimeSpan duration, TimeSpan warningThreshold)
{
Gate = new RateGate(occurences, duration);
Gate = new RateGate(occurrences, duration);
StartDurationThreshold = warningThreshold;
IsInfoEnabled = Log.IsInfoEnabled;
}

public async Task Invoke(ITransportReceiveContext context, Func<ITransportReceiveContext, Task> next)
{
var start = Stopwatch.StartNew();
await Gate.WaitAsync().ConfigureAwait(false);
var duration = start.Elapsed;
if (duration > WarningThresshold)
if (IsInfoEnabled && StartDurationThreshold > TimeSpan.Zero)
{
Log.InfoFormat("Message '{0}' delayed by {1:g} due to throttling. This can conflict with message lease times or transaction timeouts. Consider lowering the burst size or to shorten the rate limiting duration.",
context.Message.MessageId,
duration
);
var start = Stopwatch.StartNew();
await Gate.WaitAsync().ConfigureAwait(false);
var duration = start.Elapsed;
if (duration > StartDurationThreshold)
{
Log.InfoFormat(
"Message '{0}' delayed due to throttling by {1:g} which is more than the configured `WarningThresshold` value {2:g}. This can conflict with message lease times or transaction timeouts. Consider lowering the burst size or to shorten the rate limiting duration. ",
context.Message.MessageId,
duration,
StartDurationThreshold
);
}
}
else
{
await Gate.WaitAsync().ConfigureAwait(false);
}
await next(context).ConfigureAwait(false);
}
Expand Down
7 changes: 0 additions & 7 deletions src/NServiceBus.RateLimiter/Properties.cs

This file was deleted.

20 changes: 20 additions & 0 deletions src/NServiceBus.RateLimiter/RateLimiterConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;

/// <summary>
/// Rate limiter configuration
/// </summary>
public sealed class RateLimiterConfiguration
{
/// <summary>
/// The duration after which the configured rate Limit is reset.
/// </summary>
public TimeSpan Duration { get; set; }
/// <summary>
/// The maximum number of message to process within the configured Duration.
/// </summary>
public int Limit { get; set; }
/// <summary>
/// The duration warning threshold after which a Informational log entry if logged to raise awareness of an imbalance between configured rate limit and concurrency. Set to -1 milliseconds to disable.
/// </summary>
public TimeSpan StartDurationThreshold { get; set; } = TimeSpan.FromSeconds(5);
}
38 changes: 28 additions & 10 deletions src/NServiceBus.RateLimiter/RateLimiterConfigurationExtension.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Logging;

namespace NServiceBus
{
Expand All @@ -26,15 +25,15 @@ public static class RateLimiterConfigurationExtension
public static void ApplyRateLimiting(this EndpointConfiguration instance, int limitPerSecond, int burstSize)
{
if (limitPerSecond < 1) throw new ArgumentOutOfRangeException(nameof(limitPerSecond), limitPerSecond, "Must be larger than 0");
var properties = new global::Properties
var properties = new global::RateLimiterConfiguration
{
Duration = TimeSpan.FromSeconds(1.0 / limitPerSecond * burstSize),
Limit = burstSize,
};
var minimumBurstSize = limitPerSecond / 10D;
if (burstSize < minimumBurstSize) throw new ArgumentOutOfRangeException(nameof(burstSize), burstSize, $"Must be larger than {Math.Ceiling(minimumBurstSize):N0} when {nameof(limitPerSecond)}={limitPerSecond}.");
if (burstSize < minimumBurstSize) throw new ArgumentOutOfRangeException(nameof(burstSize), burstSize, $"Must be larger than {Math.Ceiling(minimumBurstSize):N0} when {nameof(limitPerSecond)}={limitPerSecond}");
var settings = instance.GetSettings();
settings.Set<global::Properties>(properties);
settings.Set<global::RateLimiterConfiguration>(properties);
instance.EnableFeature<RateLimiterFeature>();
}

Expand All @@ -49,14 +48,14 @@ public static void ApplyRateLimiting(this EndpointConfiguration instance, int li
public static void ApplyRateLimiting(this EndpointConfiguration instance, int limit, TimeSpan duration)
{
if (limit < 1) throw new ArgumentOutOfRangeException(nameof(limit), limit, "Must be larger than 0");
if (duration < MinimumDuration) throw new ArgumentOutOfRangeException(nameof(duration), duration, $"Must be larger than {MinimumDuration.TotalMilliseconds}ms.");
var properties = new global::Properties
if (duration < MinimumDuration) throw new ArgumentOutOfRangeException(nameof(duration), duration, $"Must be larger than {MinimumDuration.TotalMilliseconds}ms");
var properties = new global::RateLimiterConfiguration
{
Duration = duration,
Limit = limit,
};
var settings = instance.GetSettings();
settings.Set<global::Properties>(properties);
settings.Set<global::RateLimiterConfiguration>(properties);
instance.EnableFeature<RateLimiterFeature>();
}

Expand All @@ -71,14 +70,33 @@ public static void ApplyRateLimiting(this EndpointConfiguration instance, int li
public static void ApplyRateLimiting(this EndpointConfiguration instance, int limitPerSecond)
{
if (limitPerSecond < 1) throw new ArgumentOutOfRangeException(nameof(limitPerSecond), limitPerSecond, "Must be larger than 0");
var properties = new global::Properties
var properties = new global::RateLimiterConfiguration
{
Duration = TimeSpan.FromSeconds(1),
Limit = limitPerSecond,
};
var settings = instance.GetSettings();
settings.Set<global::Properties>(properties);
settings.Set<global::RateLimiterConfiguration>(properties);
instance.EnableFeature<RateLimiterFeature>();
}

/// <summary>
/// Enable rate limiting.
/// </summary>
public static void ApplyRateLimiting(this EndpointConfiguration instance, Action<RateLimiterConfiguration> config)
{
if (config == null) throw new ArgumentNullException(nameof(config));

var properties = new RateLimiterConfiguration();
config(properties);

if (properties.Limit < 1) throw new ArgumentOutOfRangeException(nameof(RateLimiterConfiguration.Limit), properties.Limit, "Must be larger than 0");
if (properties.Duration < MinimumDuration) throw new ArgumentOutOfRangeException(nameof(RateLimiterConfiguration.Duration), properties.Duration, $"Must be larger than {MinimumDuration.TotalMilliseconds}ms");
if (properties.StartDurationThreshold == TimeSpan.Zero || properties.StartDurationThreshold < System.Threading.Timeout.InfiniteTimeSpan) throw new ArgumentOutOfRangeException(nameof(RateLimiterConfiguration.StartDurationThreshold), properties.StartDurationThreshold, $"Must be larger than 0 or -1ms to disable");

var settings = instance.GetSettings();
settings.Set<global::RateLimiterConfiguration>(properties);
instance.EnableFeature<RateLimiterFeature>();
}
}
}
}
4 changes: 2 additions & 2 deletions src/NServiceBus.RateLimiter/RateLimiterFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ sealed class RateLimiterFeature : Feature
{
protected override void Setup(FeatureConfigurationContext context)
{
if (!context.Settings.TryGet<Properties>(out var properties)) return;
if (!context.Settings.TryGet<RateLimiterConfiguration>(out var properties)) return;
LogManager.GetLogger("RateLimiter").InfoFormat("Rate limiter configuration: Limit:{0:N0} Duration:{1:g}", properties.Limit, properties.Duration);
context.Pipeline.Register(behavior: new RateLimitBehavior(properties.Limit, properties.Duration), description: nameof(RateLimitBehavior));
context.Pipeline.Register(behavior: new RateLimitBehavior(properties.Limit, properties.Duration, properties.StartDurationThreshold), description: nameof(RateLimitBehavior));
}
}

0 comments on commit 6794614

Please sign in to comment.