Skip to content

Commit

Permalink
Limit pool capacity, add LinkTo helper
Browse files Browse the repository at this point in the history
  • Loading branch information
MihaZupan committed Oct 20, 2021
1 parent 2a50622 commit 733e986
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/ReverseProxy/Forwarder/StreamCopyHttpContent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async Task SerializeToStreamAsync(Stream stream, TransportContext? context, Canc
if (_activityToken.Token != cancellationToken)
{
Debug.Assert(cancellationToken.CanBeCanceled);
registration = cancellationToken.UnsafeRegister(ActivityCancellationTokenSource.LinkedTokenCancelDelegate, _activityToken);
registration = _activityToken.LinkTo(cancellationToken);
}
#else
// On .NET Core 3.1, cancellationToken will always be CancellationToken.None
Expand Down
26 changes: 21 additions & 5 deletions src/ReverseProxy/Utilities/ActivityCancellationTokenSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ namespace Yarp.ReverseProxy.Utilities
internal sealed class ActivityCancellationTokenSource : CancellationTokenSource
{
#if NET6_0_OR_GREATER
private const int MaxQueueSize = 1024;
private static readonly ConcurrentQueue<ActivityCancellationTokenSource> _sharedSources = new();
private static int _count;
#endif

public static readonly Action<object?> LinkedTokenCancelDelegate = static s =>
private static readonly Action<object?> _linkedTokenCancelDelegate = static s =>
{
((ActivityCancellationTokenSource)s!).Cancel(throwOnFirstException: false);
};
Expand All @@ -31,7 +33,11 @@ public void ResetTimeout()
public static ActivityCancellationTokenSource Rent(TimeSpan activityTimeout, CancellationToken linkedToken)
{
#if NET6_0_OR_GREATER
if (!_sharedSources.TryDequeue(out var cts))
if (_sharedSources.TryDequeue(out var cts))
{
Interlocked.Decrement(ref _count);
}
else
{
cts = new ActivityCancellationTokenSource();
}
Expand All @@ -40,7 +46,7 @@ public static ActivityCancellationTokenSource Rent(TimeSpan activityTimeout, Can
#endif

cts._activityTimeoutMs = (int)activityTimeout.TotalMilliseconds;
cts._linkedRegistration = linkedToken.UnsafeRegister(LinkedTokenCancelDelegate, cts);
cts._linkedRegistration = cts.LinkTo(linkedToken);
cts.ResetTimeout();

return cts;
Expand All @@ -54,12 +60,22 @@ public void Return()
#if NET6_0_OR_GREATER
if (TryReset())
{
_sharedSources.Enqueue(this);
return;
if (Interlocked.Increment(ref _count) <= MaxQueueSize)
{
_sharedSources.Enqueue(this);
return;
}

Interlocked.Decrement(ref _count);
}
#endif

Dispose();
}

public CancellationTokenRegistration LinkTo(CancellationToken linkedToken)
{
return linkedToken.UnsafeRegister(_linkedTokenCancelDelegate, this);
}
}
}

0 comments on commit 733e986

Please sign in to comment.