From 67946141706f190a787dc5e26aa29a4276c12b8b Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Sun, 16 Jul 2023 13:40:48 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Ability=20to=20override=20`StartDur?= =?UTF-8?q?ationThreshold`=20to=20change=20the=20duration=E2=80=A6=20(#5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 13 +++++++ .../Pipeline/RateLimitBehavior.cs | 34 +++++++++++------ src/NServiceBus.RateLimiter/Properties.cs | 7 ---- .../RateLimiterConfiguration.cs | 20 ++++++++++ .../RateLimiterConfigurationExtension.cs | 38 ++++++++++++++----- .../RateLimiterFeature.cs | 4 +- 6 files changed, 86 insertions(+), 30 deletions(-) delete mode 100644 src/NServiceBus.RateLimiter/Properties.cs create mode 100644 src/NServiceBus.RateLimiter/RateLimiterConfiguration.cs diff --git a/README.md b/README.md index d5bd726..89e65b0 100644 --- a/README.md +++ b/README.md @@ -79,3 +79,16 @@ endpointConfiguration.ApplyRateLimiting(limitPerSecond:10, burstSize:250); ``` Using this logic to cope with sudden peaks of incoming messages but still maintain an average rate of messages per second. + +### Example: Disable duration log entry message + +By default, is processing does not start within 5 seconds a message is written to the log. This can be disabled. + +```c# +endpointConfiguration.ApplyRateLimiting(cfg=>{ + cfg.Duration = TimeSpan.FromSeconds(1), + cfg.Limit = 10, + cfg.StartDurationThreshold = Timeout.InfiniteTimeSpan; // or TimeSpan.FromMilliseconds(-1); +cfg. +}); +``` \ No newline at end of file diff --git a/src/NServiceBus.RateLimiter/Pipeline/RateLimitBehavior.cs b/src/NServiceBus.RateLimiter/Pipeline/RateLimitBehavior.cs index 98cbbb7..7e2120f 100644 --- a/src/NServiceBus.RateLimiter/Pipeline/RateLimitBehavior.cs +++ b/src/NServiceBus.RateLimiter/Pipeline/RateLimitBehavior.cs @@ -7,25 +7,37 @@ sealed class RateLimitBehavior : IBehavior { readonly ILog Log = LogManager.GetLogger("RateLimiter"); - readonly TimeSpan WarningThresshold = TimeSpan.FromSeconds(5); + readonly bool IsInfoEnabled; + readonly TimeSpan StartDurationThreshold; readonly RateGate Gate; - public RateLimitBehavior(int occurences, TimeSpan duration) + public RateLimitBehavior(int occurrences, TimeSpan duration, TimeSpan warningThreshold) { - Gate = new RateGate(occurences, duration); + Gate = new RateGate(occurrences, duration); + StartDurationThreshold = warningThreshold; + IsInfoEnabled = Log.IsInfoEnabled; } public async Task Invoke(ITransportReceiveContext context, Func next) { - var start = Stopwatch.StartNew(); - await Gate.WaitAsync().ConfigureAwait(false); - var duration = start.Elapsed; - if (duration > WarningThresshold) + if (IsInfoEnabled && StartDurationThreshold > TimeSpan.Zero) { - Log.InfoFormat("Message '{0}' delayed by {1:g} due to throttling. This can conflict with message lease times or transaction timeouts. Consider lowering the burst size or to shorten the rate limiting duration.", - context.Message.MessageId, - duration - ); + var start = Stopwatch.StartNew(); + await Gate.WaitAsync().ConfigureAwait(false); + var duration = start.Elapsed; + if (duration > StartDurationThreshold) + { + Log.InfoFormat( + "Message '{0}' delayed due to throttling by {1:g} which is more than the configured `WarningThresshold` value {2:g}. This can conflict with message lease times or transaction timeouts. Consider lowering the burst size or to shorten the rate limiting duration. ", + context.Message.MessageId, + duration, + StartDurationThreshold + ); + } + } + else + { + await Gate.WaitAsync().ConfigureAwait(false); } await next(context).ConfigureAwait(false); } diff --git a/src/NServiceBus.RateLimiter/Properties.cs b/src/NServiceBus.RateLimiter/Properties.cs deleted file mode 100644 index 5707848..0000000 --- a/src/NServiceBus.RateLimiter/Properties.cs +++ /dev/null @@ -1,7 +0,0 @@ -using System; - -class Properties -{ - public TimeSpan Duration; - public int Limit; -} \ No newline at end of file diff --git a/src/NServiceBus.RateLimiter/RateLimiterConfiguration.cs b/src/NServiceBus.RateLimiter/RateLimiterConfiguration.cs new file mode 100644 index 0000000..a8a9d6a --- /dev/null +++ b/src/NServiceBus.RateLimiter/RateLimiterConfiguration.cs @@ -0,0 +1,20 @@ +using System; + +/// +/// Rate limiter configuration +/// +public sealed class RateLimiterConfiguration +{ + /// + /// The duration after which the configured rate Limit is reset. + /// + public TimeSpan Duration { get; set; } + /// + /// The maximum number of message to process within the configured Duration. + /// + public int Limit { get; set; } + /// + /// The duration warning threshold after which a Informational log entry if logged to raise awareness of an imbalance between configured rate limit and concurrency. Set to -1 milliseconds to disable. + /// + public TimeSpan StartDurationThreshold { get; set; } = TimeSpan.FromSeconds(5); +} \ No newline at end of file diff --git a/src/NServiceBus.RateLimiter/RateLimiterConfigurationExtension.cs b/src/NServiceBus.RateLimiter/RateLimiterConfigurationExtension.cs index 0908613..b024972 100644 --- a/src/NServiceBus.RateLimiter/RateLimiterConfigurationExtension.cs +++ b/src/NServiceBus.RateLimiter/RateLimiterConfigurationExtension.cs @@ -1,6 +1,5 @@ using System; using NServiceBus.Configuration.AdvancedExtensibility; -using NServiceBus.Logging; namespace NServiceBus { @@ -26,15 +25,15 @@ public static class RateLimiterConfigurationExtension public static void ApplyRateLimiting(this EndpointConfiguration instance, int limitPerSecond, int burstSize) { if (limitPerSecond < 1) throw new ArgumentOutOfRangeException(nameof(limitPerSecond), limitPerSecond, "Must be larger than 0"); - var properties = new global::Properties + var properties = new global::RateLimiterConfiguration { Duration = TimeSpan.FromSeconds(1.0 / limitPerSecond * burstSize), Limit = burstSize, }; var minimumBurstSize = limitPerSecond / 10D; - if (burstSize < minimumBurstSize) throw new ArgumentOutOfRangeException(nameof(burstSize), burstSize, $"Must be larger than {Math.Ceiling(minimumBurstSize):N0} when {nameof(limitPerSecond)}={limitPerSecond}."); + if (burstSize < minimumBurstSize) throw new ArgumentOutOfRangeException(nameof(burstSize), burstSize, $"Must be larger than {Math.Ceiling(minimumBurstSize):N0} when {nameof(limitPerSecond)}={limitPerSecond}"); var settings = instance.GetSettings(); - settings.Set(properties); + settings.Set(properties); instance.EnableFeature(); } @@ -49,14 +48,14 @@ public static void ApplyRateLimiting(this EndpointConfiguration instance, int li public static void ApplyRateLimiting(this EndpointConfiguration instance, int limit, TimeSpan duration) { if (limit < 1) throw new ArgumentOutOfRangeException(nameof(limit), limit, "Must be larger than 0"); - if (duration < MinimumDuration) throw new ArgumentOutOfRangeException(nameof(duration), duration, $"Must be larger than {MinimumDuration.TotalMilliseconds}ms."); - var properties = new global::Properties + if (duration < MinimumDuration) throw new ArgumentOutOfRangeException(nameof(duration), duration, $"Must be larger than {MinimumDuration.TotalMilliseconds}ms"); + var properties = new global::RateLimiterConfiguration { Duration = duration, Limit = limit, }; var settings = instance.GetSettings(); - settings.Set(properties); + settings.Set(properties); instance.EnableFeature(); } @@ -71,14 +70,33 @@ public static void ApplyRateLimiting(this EndpointConfiguration instance, int li public static void ApplyRateLimiting(this EndpointConfiguration instance, int limitPerSecond) { if (limitPerSecond < 1) throw new ArgumentOutOfRangeException(nameof(limitPerSecond), limitPerSecond, "Must be larger than 0"); - var properties = new global::Properties + var properties = new global::RateLimiterConfiguration { Duration = TimeSpan.FromSeconds(1), Limit = limitPerSecond, }; var settings = instance.GetSettings(); - settings.Set(properties); + settings.Set(properties); + instance.EnableFeature(); + } + + /// + /// Enable rate limiting. + /// + public static void ApplyRateLimiting(this EndpointConfiguration instance, Action config) + { + if (config == null) throw new ArgumentNullException(nameof(config)); + + var properties = new RateLimiterConfiguration(); + config(properties); + + if (properties.Limit < 1) throw new ArgumentOutOfRangeException(nameof(RateLimiterConfiguration.Limit), properties.Limit, "Must be larger than 0"); + if (properties.Duration < MinimumDuration) throw new ArgumentOutOfRangeException(nameof(RateLimiterConfiguration.Duration), properties.Duration, $"Must be larger than {MinimumDuration.TotalMilliseconds}ms"); + if (properties.StartDurationThreshold == TimeSpan.Zero || properties.StartDurationThreshold < System.Threading.Timeout.InfiniteTimeSpan) throw new ArgumentOutOfRangeException(nameof(RateLimiterConfiguration.StartDurationThreshold), properties.StartDurationThreshold, $"Must be larger than 0 or -1ms to disable"); + + var settings = instance.GetSettings(); + settings.Set(properties); instance.EnableFeature(); } } -} +} \ No newline at end of file diff --git a/src/NServiceBus.RateLimiter/RateLimiterFeature.cs b/src/NServiceBus.RateLimiter/RateLimiterFeature.cs index 9efd2a4..e11204e 100644 --- a/src/NServiceBus.RateLimiter/RateLimiterFeature.cs +++ b/src/NServiceBus.RateLimiter/RateLimiterFeature.cs @@ -5,8 +5,8 @@ sealed class RateLimiterFeature : Feature { protected override void Setup(FeatureConfigurationContext context) { - if (!context.Settings.TryGet(out var properties)) return; + if (!context.Settings.TryGet(out var properties)) return; LogManager.GetLogger("RateLimiter").InfoFormat("Rate limiter configuration: Limit:{0:N0} Duration:{1:g}", properties.Limit, properties.Duration); - context.Pipeline.Register(behavior: new RateLimitBehavior(properties.Limit, properties.Duration), description: nameof(RateLimitBehavior)); + context.Pipeline.Register(behavior: new RateLimitBehavior(properties.Limit, properties.Duration, properties.StartDurationThreshold), description: nameof(RateLimitBehavior)); } } \ No newline at end of file