From 60f4113b52244c66a7f50380ad7805d844be0c8b Mon Sep 17 00:00:00 2001 From: martintmk Date: Wed, 26 Apr 2023 13:46:56 +0200 Subject: [PATCH] Implement HedgingResilienceStrategy --- .github/dependabot.yml | 1 + src/Directory.Packages.props | 3 +- .../Benchmarks/HedgingBenchmark.cs | 42 + .../Internals/Helper.Hedging.cs | 23 + src/Polly.Core.Benchmarks/README.md | 9 + .../Controller/HedgingControllerTests.cs | 36 + .../HedgingExecutionContextTests.cs | 490 ++++++++++++ .../Hedging/Controller/TaskExecutionTests.cs | 269 +++++++ .../HedgingActionGeneratorArgumentsTests.cs | 15 + .../Hedging/HedgingActions.cs | 62 ++ .../Hedging/HedgingHandlerTests.cs | 12 +- ...esilienceStrategyBuilderExtensionsTests.cs | 43 + .../Hedging/HedgingResilienceStrategyTests.cs | 756 +++++++++++++++++- .../HedgingStrategyOptionsTResultTests.cs | 2 +- .../Hedging/HedgingTimeProvider.cs | 55 ++ .../Hedging/PrimaryStringTasks.cs | 33 + .../Helpers/DisposableResult.cs | 25 + .../ResilienceContextTests.cs | 19 + .../ResiliencePropertiesTests.cs | 30 + .../Utils/DisposeHelperTests.cs | 44 + .../Hedging/Controller/ContextSnapshot.cs | 3 + .../Hedging/Controller/HedgedTaskType.cs | 7 + .../Hedging/Controller/HedgingController.cs | 55 ++ .../Controller/HedgingExecutionContext.cs | 263 ++++++ .../Hedging/Controller/TaskExecution.cs | 212 +++++ ...HedgingActionGeneratorArguments.TResult.cs | 11 +- .../HedgingActionGeneratorArguments.cs | 11 +- src/Polly.Core/Hedging/HedgingConstants.cs | 2 + .../Hedging/HedgingHandler.Handler.cs | 20 +- src/Polly.Core/Hedging/HedgingHandler.cs | 23 +- .../Hedging/HedgingResilienceStrategy.cs | 89 ++- ...gingResilienceStrategyBuilderExtensions.cs | 2 +- src/Polly.Core/Polly.Core.csproj | 1 + src/Polly.Core/ResilienceContext.cs | 14 +- src/Polly.Core/ResilienceProperties.cs | 12 + src/Polly.Core/Strategy/Outcome.cs | 15 + src/Polly.Core/Utils/DisposeHelper.cs | 28 + src/Polly.Core/Utils/IObjectPool.cs | 23 + src/Polly.Core/Utils/ObjectPool.cs | 11 +- 39 files changed, 2728 insertions(+), 43 deletions(-) create mode 100644 src/Polly.Core.Benchmarks/Benchmarks/HedgingBenchmark.cs create mode 100644 src/Polly.Core.Benchmarks/Internals/Helper.Hedging.cs create mode 100644 src/Polly.Core.Tests/Hedging/Controller/HedgingControllerTests.cs create mode 100644 src/Polly.Core.Tests/Hedging/Controller/HedgingExecutionContextTests.cs create mode 100644 src/Polly.Core.Tests/Hedging/Controller/TaskExecutionTests.cs create mode 100644 src/Polly.Core.Tests/Hedging/HedgingActionGeneratorArgumentsTests.cs create mode 100644 src/Polly.Core.Tests/Hedging/HedgingActions.cs create mode 100644 src/Polly.Core.Tests/Hedging/HedgingTimeProvider.cs create mode 100644 src/Polly.Core.Tests/Hedging/PrimaryStringTasks.cs create mode 100644 src/Polly.Core.Tests/Helpers/DisposableResult.cs create mode 100644 src/Polly.Core.Tests/Utils/DisposeHelperTests.cs create mode 100644 src/Polly.Core/Hedging/Controller/ContextSnapshot.cs create mode 100644 src/Polly.Core/Hedging/Controller/HedgedTaskType.cs create mode 100644 src/Polly.Core/Hedging/Controller/HedgingController.cs create mode 100644 src/Polly.Core/Hedging/Controller/HedgingExecutionContext.cs create mode 100644 src/Polly.Core/Hedging/Controller/TaskExecution.cs create mode 100644 src/Polly.Core/Utils/DisposeHelper.cs create mode 100644 src/Polly.Core/Utils/IObjectPool.cs diff --git a/.github/dependabot.yml b/.github/dependabot.yml index e7e5b1b85b0..67ef2b7ecc0 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -24,3 +24,4 @@ updates: - dependency-name: "Microsoft.Extensions.Logging" - dependency-name: "System.Diagnostics.DiagnosticSource" - dependency-name: "System.Threading.RateLimiting" + - dependency-name: "Microsoft.Bcl.AsyncInterfaces" diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 2294520024f..28109d62e23 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -4,6 +4,7 @@ + @@ -34,4 +35,4 @@ - \ No newline at end of file + diff --git a/src/Polly.Core.Benchmarks/Benchmarks/HedgingBenchmark.cs b/src/Polly.Core.Benchmarks/Benchmarks/HedgingBenchmark.cs new file mode 100644 index 00000000000..1395a6d4dbf --- /dev/null +++ b/src/Polly.Core.Benchmarks/Benchmarks/HedgingBenchmark.cs @@ -0,0 +1,42 @@ +using System.Threading.Tasks; +using BenchmarkDotNet.Attributes; +using Polly.Core.Benchmarks; + +namespace Polly.Benchmarks; + +public class HedgingBenchmark +{ + private ResilienceStrategy? _strategy; + + [GlobalSetup] + public void Setup() + { + _strategy = Helper.CreateHedging(); + } + + [Benchmark(Baseline = true)] + public async ValueTask Hedging_Primary() + => await _strategy!.ExecuteValueTaskAsync(static _ => new ValueTask("primary")).ConfigureAwait(false); + + [Benchmark] + public async ValueTask Hedging_Secondary() + => await _strategy!.ExecuteValueTaskAsync(static _ => new ValueTask(Helper.Failure)).ConfigureAwait(false); + + [Benchmark] + public async ValueTask Hedging_Primary_AsyncWork() + => await _strategy!.ExecuteValueTaskAsync( + static async _ => + { + await Task.Yield(); + return "primary"; + }).ConfigureAwait(false); + + [Benchmark] + public async ValueTask Hedging_Secondary_AsyncWork() + => await _strategy!.ExecuteValueTaskAsync( + static async _ => + { + await Task.Yield(); + return Helper.Failure; + }).ConfigureAwait(false); +} diff --git a/src/Polly.Core.Benchmarks/Internals/Helper.Hedging.cs b/src/Polly.Core.Benchmarks/Internals/Helper.Hedging.cs new file mode 100644 index 00000000000..0d566a85e25 --- /dev/null +++ b/src/Polly.Core.Benchmarks/Internals/Helper.Hedging.cs @@ -0,0 +1,23 @@ +using Polly.Hedging; + +namespace Polly.Core.Benchmarks; + +internal static partial class Helper +{ + public const string Failure = "failure"; + + public static ResilienceStrategy CreateHedging() + { + return CreateStrategy(builder => + { + builder.AddHedging(new HedgingStrategyOptions + { + Handler = new HedgingHandler().SetHedging(handler => + { + handler.ShouldHandle.HandleResult(Failure); + handler.HedgingActionGenerator = args => () => Task.FromResult("hedged response"); + }) + }); + }); + } +} diff --git a/src/Polly.Core.Benchmarks/README.md b/src/Polly.Core.Benchmarks/README.md index fc24c8c6126..6911a8ef218 100644 --- a/src/Polly.Core.Benchmarks/README.md +++ b/src/Polly.Core.Benchmarks/README.md @@ -60,3 +60,12 @@ LaunchCount=2 WarmupCount=10 |--------------------------- |---------:|----------:|----------:|------:|-------:|----------:|------------:| | ExecuteStrategyPipeline_V7 | 1.523 us | 0.0092 us | 0.0137 us | 1.00 | 0.3433 | 2872 B | 1.00 | | ExecuteStrategyPipeline_V8 | 1.276 us | 0.0128 us | 0.0191 us | 0.84 | 0.0114 | 96 B | 0.03 | + +## HEDGING + +| Method | Mean | Error | StdDev | Ratio | RatioSD | Gen0 | Gen1 | Allocated | Alloc Ratio | +|---------------------------- |-----------:|----------:|----------:|------:|--------:|-------:|-------:|----------:|------------:| +| Hedging_Primary | 891.3 ns | 39.39 ns | 58.96 ns | 1.00 | 0.00 | 0.0048 | - | 40 B | 1.00 | +| Hedging_Secondary | 1,500.0 ns | 7.88 ns | 11.80 ns | 1.69 | 0.11 | 0.0229 | - | 200 B | 5.00 | +| Hedging_Primary_AsyncWork | 4,250.9 ns | 140.89 ns | 206.52 ns | 4.78 | 0.34 | 0.1831 | 0.0305 | 1518 B | 37.95 | +| Hedging_Secondary_AsyncWork | 6,544.9 ns | 99.90 ns | 143.27 ns | 7.34 | 0.39 | 0.2213 | 0.0839 | 1872 B | 46.80 | diff --git a/src/Polly.Core.Tests/Hedging/Controller/HedgingControllerTests.cs b/src/Polly.Core.Tests/Hedging/Controller/HedgingControllerTests.cs new file mode 100644 index 00000000000..fe3ae6939b5 --- /dev/null +++ b/src/Polly.Core.Tests/Hedging/Controller/HedgingControllerTests.cs @@ -0,0 +1,36 @@ +using Polly.Hedging; +using Polly.Hedging.Utils; + +namespace Polly.Core.Tests.Hedging.Controller; + +public class HedgingControllerTests +{ + [Fact] + public async Task Pooling_Ok() + { + var handler = new HedgingHandler().SetHedging(handler => handler.HedgingActionGenerator = args => null).CreateHandler(); + var controller = new HedgingController(new HedgingTimeProvider(), handler!, 3); + + var context1 = controller.GetContext(ResilienceContext.Get()); + await PrepareAsync(context1); + + var context2 = controller.GetContext(ResilienceContext.Get()); + await PrepareAsync(context2); + + controller.RentedContexts.Should().Be(2); + controller.RentedExecutions.Should().Be(2); + + context1.Complete(); + context2.Complete(); + + controller.RentedContexts.Should().Be(0); + controller.RentedExecutions.Should().Be(0); + } + + private static async Task PrepareAsync(HedgingExecutionContext context) + { + await context.LoadExecutionAsync((_, _) => new ValueTask(10), "state"); + await context.TryWaitForCompletedExecutionAsync(HedgingStrategyOptions.InfiniteHedgingDelay); + context.Tasks[0].AcceptOutcome(); + } +} diff --git a/src/Polly.Core.Tests/Hedging/Controller/HedgingExecutionContextTests.cs b/src/Polly.Core.Tests/Hedging/Controller/HedgingExecutionContextTests.cs new file mode 100644 index 00000000000..8e60d247f21 --- /dev/null +++ b/src/Polly.Core.Tests/Hedging/Controller/HedgingExecutionContextTests.cs @@ -0,0 +1,490 @@ +using System; +using System.Globalization; +using System.Threading.Tasks; +using Polly.Core.Tests.Helpers; +using Polly.Hedging; +using Polly.Hedging.Controller; +using Polly.Hedging.Utils; +using Polly.Strategy; +using Polly.Utils; + +namespace Polly.Core.Tests.Hedging.Controller; +public class HedgingExecutionContextTests : IDisposable +{ + private const string Handled = "Handled"; + private static readonly TimeSpan AssertTimeout = TimeSpan.FromSeconds(5); + private readonly ResiliencePropertyKey _myKey = new("my-key"); + private readonly HedgingHandler _hedgingHandler; + private readonly CancellationTokenSource _cts; + private readonly HedgingTimeProvider _timeProvider; + private readonly List _createdExecutions = new(); + private readonly List _returnedExecutions = new(); + private readonly List _resets = new(); + private readonly ResilienceContext _resilienceContext; + private readonly AutoResetEvent _onReset = new(false); + private int _maxAttempts = 2; + + public HedgingExecutionContextTests() + { + _timeProvider = new HedgingTimeProvider(); + _cts = new CancellationTokenSource(); + _hedgingHandler = new HedgingHandler(); + _hedgingHandler.SetHedging(handler => + { + handler.ShouldHandle + .HandleResult(r => r!.Name == Handled) + .HandleException(); + handler.HedgingActionGenerator = args => Generator(args); + }); + + _resilienceContext = ResilienceContext.Get().Initialize(false); + _resilienceContext.CancellationToken = _cts.Token; + _resilienceContext.Properties.Set(_myKey, "dummy"); + } + + public void Dispose() + { + _cts.Dispose(); + _onReset.Dispose(); + } + + [Fact] + public void Ctor_Ok() + { + var context = Create(); + + context.LoadedTasks.Should().Be(0); + context.Snapshot.Context.Should().BeNull(); + + context.Should().NotBeNull(); + } + + [Fact] + public void Initialize_Ok() + { + var props = _resilienceContext.Properties; + var context = Create(); + + context.Initialize(_resilienceContext); + + context.Snapshot.Context.Should().Be(_resilienceContext); + context.Snapshot.Context.Properties.Should().NotBeSameAs(props); + context.Snapshot.OriginalProperties.Should().BeSameAs(props); + context.Snapshot.OriginalCancellationToken.Should().Be(_cts.Token); + context.Snapshot.Context.Properties.Should().HaveCount(1); + context.IsInitialized.Should().BeTrue(); + } + + [InlineData(0)] + [InlineData(1)] + [InlineData(-1)] + [Theory] + public async Task TryWaitForCompletedExecutionAsync_Initialized_Ok(int delay) + { + var context = Create(); + context.Initialize(_resilienceContext); + var delayTimeSpan = TimeSpan.FromSeconds(delay); + + var task = context.TryWaitForCompletedExecutionAsync(delayTimeSpan); + + _timeProvider.Advance(TimeSpan.FromHours(1)); + + (await task).Should().BeNull(); + } + + [Fact] + public async Task TryWaitForCompletedExecutionAsync_FinishedTask_Ok() + { + var context = Create(); + context.Initialize(_resilienceContext); + await context.LoadExecutionAsync((_, _) => new ValueTask("dummy"), "state"); + + var task = await context.TryWaitForCompletedExecutionAsync(TimeSpan.Zero); + + task.Should().NotBeNull(); + task!.ExecutionTask!.IsCompleted.Should().BeTrue(); + task.Outcome.Result.Should().Be("dummy"); + task.AcceptOutcome(); + context.LoadedTasks.Should().Be(1); + } + + [Fact] + public async Task TryWaitForCompletedExecutionAsync_ConcurrentExecution_Ok() + { + var context = Create(); + context.Initialize(_resilienceContext); + ConfigureSecondaryTasks(TimeSpan.FromHours(1), TimeSpan.FromHours(1)); + + for (int i = 0; i < _maxAttempts - 1; i++) + { + await LoadExecutionAsync(context, TimeSpan.FromHours(1)); + } + + for (int i = 0; i < _maxAttempts; i++) + { + (await context.TryWaitForCompletedExecutionAsync(TimeSpan.Zero)).Should().BeNull(); + } + + _timeProvider.Advance(TimeSpan.FromDays(1)); + await context.TryWaitForCompletedExecutionAsync(TimeSpan.Zero); + await context.Tasks.First().ExecutionTask!; + context.Tasks.First().AcceptOutcome(); + } + + [Fact] + public async Task TryWaitForCompletedExecutionAsync_SynchronousExecution_Ok() + { + var context = Create(); + context.Initialize(_resilienceContext); + ConfigureSecondaryTasks(TimeSpan.FromHours(1), TimeSpan.FromHours(1)); + + for (int i = 0; i < _maxAttempts - 1; i++) + { + await LoadExecutionAsync(context, TimeSpan.FromHours(1)); + } + + var task = context.TryWaitForCompletedExecutionAsync(HedgingStrategyOptions.InfiniteHedgingDelay).AsTask(); + task.Wait(20).Should().BeFalse(); + _timeProvider.Advance(TimeSpan.FromDays(1)); + await task; + context.Tasks.First().AcceptOutcome(); + } + + [Fact] + public async Task TryWaitForCompletedExecutionAsync_HedgedExecution_Ok() + { + var context = Create(); + context.Initialize(_resilienceContext); + ConfigureSecondaryTasks(TimeSpan.FromHours(1), TimeSpan.FromHours(1)); + + for (int i = 0; i < _maxAttempts - 1; i++) + { + await LoadExecutionAsync(context, TimeSpan.FromHours(1)); + } + + var hedgingDelay = TimeSpan.FromSeconds(5); + var count = _timeProvider.DelayEntries.Count; + var task = context.TryWaitForCompletedExecutionAsync(hedgingDelay).AsTask(); + task.Wait(20).Should().BeFalse(); + _timeProvider.DelayEntries.Should().HaveCount(count + 1); + _timeProvider.DelayEntries.Last().Delay.Should().Be(hedgingDelay); + _timeProvider.Advance(TimeSpan.FromDays(1)); + await task; + await context.Tasks.First().ExecutionTask!; + context.Tasks.First().AcceptOutcome(); + } + + [Fact] + public async Task TryWaitForCompletedExecutionAsync_TwiceWhenSecondaryGeneratorNotRegistered_Ok() + { + var context = Create(); + context.Initialize(_resilienceContext); + await context.LoadExecutionAsync((_, _) => new ValueTask("dummy"), "state"); + await context.LoadExecutionAsync((_, _) => new ValueTask("dummy"), "state"); + + var task = await context.TryWaitForCompletedExecutionAsync(TimeSpan.Zero); + + task!.AcceptOutcome(); + context.LoadedTasks.Should().Be(1); + } + + [Fact] + public async Task TryWaitForCompletedExecutionAsync_TwiceWhenSecondaryGeneratorRegistered_Ok() + { + var context = Create(); + context.Initialize(_resilienceContext); + await LoadExecutionAsync(context); + await LoadExecutionAsync(context); + + Generator = args => () => Task.FromResult(new DisposableResult { Name = "secondary" }); + + var task = await context.TryWaitForCompletedExecutionAsync(TimeSpan.Zero); + task!.Type.Should().Be(HedgedTaskType.Primary); + task!.AcceptOutcome(); + context.LoadedTasks.Should().Be(2); + context.Tasks[0].Type.Should().Be(HedgedTaskType.Primary); + context.Tasks[1].Type.Should().Be(HedgedTaskType.Secondary); + } + + [Fact] + public async Task LoadExecutionAsync_MaxTasks_NoMoreTasksAdded() + { + _maxAttempts = 3; + var context = Create(); + context.Initialize(_resilienceContext); + ConfigureSecondaryTasks(TimeSpan.FromHours(1), TimeSpan.FromHours(1), TimeSpan.FromHours(1), TimeSpan.FromHours(1)); + + for (int i = 0; i < _maxAttempts; i++) + { + (await LoadExecutionAsync(context)).Loaded.Should().BeTrue(); + } + + (await LoadExecutionAsync(context)).Loaded.Should().BeFalse(); + + context.LoadedTasks.Should().Be(_maxAttempts); + context.Tasks[0].AcceptOutcome(); + _returnedExecutions.Should().HaveCount(0); + } + + [Fact] + public async Task LoadExecutionAsync_EnsureCorrectAttemptNumber() + { + var attempt = -1; + var context = Create(); + context.Initialize(_resilienceContext); + Generator = args => + { + attempt = args.Attempt; + return null; + }; + + await LoadExecutionAsync(context); + await LoadExecutionAsync(context); + + // primary is 0, this one is 1 + attempt.Should().Be(1); + } + + [InlineData(true)] + [InlineData(false)] + [Theory] + public async Task LoadExecutionAsync_NoMoreSecondaryTasks_AcceptFinishedOutcome(bool allExecuted) + { + _maxAttempts = 4; + var context = Create(); + context.Initialize(_resilienceContext); + ConfigureSecondaryTasks(allExecuted ? TimeSpan.Zero : TimeSpan.FromHours(1)); + + // primary + await LoadExecutionAsync(context); + + // secondary + await LoadExecutionAsync(context); + + // secondary couldn't be created + if (allExecuted) + { + await context.TryWaitForCompletedExecutionAsync(TimeSpan.Zero); + await context.TryWaitForCompletedExecutionAsync(TimeSpan.Zero); + } + + var pair = await LoadExecutionAsync(context); + pair.Loaded.Should().BeFalse(); + + _returnedExecutions.Count.Should().Be(1); + if (allExecuted) + { + pair.Outcome.Should().NotBeNull(); + context.Tasks[0].IsAccepted.Should().BeTrue(); + } + else + { + pair.Outcome.Should().BeNull(); + context.Tasks[0].IsAccepted.Should().BeFalse(); + } + + context.Tasks[0].AcceptOutcome(); + } + + [Fact] + public async Task LoadExecution_NoMoreTasks_Throws() + { + _maxAttempts = 0; + var context = Create(); + context.Initialize(_resilienceContext); + + await context.Invoking(c => LoadExecutionAsync(c)).Should().ThrowAsync(); + } + + [InlineData(true)] + [InlineData(false)] + [Theory] + public async Task Complete_EnsureOriginalContextPreparedWithAcceptedOutcome(bool primary) + { + // arrange + var type = primary ? HedgedTaskType.Primary : HedgedTaskType.Secondary; + var context = Create(); + var originalProps = _resilienceContext.Properties; + context.Initialize(_resilienceContext); + ConfigureSecondaryTasks(TimeSpan.Zero); + await ExecuteAllTasksAsync(context, 2); + context.Tasks.First(v => v.Type == type).AcceptOutcome(); + + // act + context.Complete(); + + // assert + _resilienceContext.Properties.Should().BeSameAs(originalProps); + if (primary) + { + _resilienceContext.Properties.Should().HaveCount(1); + _resilienceContext.ResilienceEvents.Should().HaveCount(0); + } + else + { + _resilienceContext.Properties.Should().HaveCount(2); + _resilienceContext.ResilienceEvents.Should().HaveCount(1); + } + } + + [Fact] + public void Complete_NoTasks_Throws() + { + var context = Create(); + context.Invoking(c => c.Complete()).Should().Throw().WithMessage("The hedging must execute at least one task."); + } + + [Fact] + public async Task Complete_NoAcceptedTasks_Throws() + { + var context = Create(); + context.Initialize(_resilienceContext); + ConfigureSecondaryTasks(TimeSpan.Zero); + await ExecuteAllTasksAsync(context, 2); + + context.Invoking(c => c.Complete()).Should().Throw().WithMessage("There must be exactly one accepted outcome for hedging. Found 0."); + } + + [Fact] + public async Task Complete_MultipleAcceptedTasks_Throws() + { + var context = Create(); + context.Initialize(_resilienceContext); + ConfigureSecondaryTasks(TimeSpan.Zero); + await ExecuteAllTasksAsync(context, 2); + context.Tasks[0].AcceptOutcome(); + context.Tasks[1].AcceptOutcome(); + + context.Invoking(c => c.Complete()).Should().Throw().WithMessage("There must be exactly one accepted outcome for hedging. Found 2."); + } + + [Fact] + public async Task Complete_EnsurePendingTasksCleaned() + { + using var assertPrimary = new ManualResetEvent(false); + using var assertSecondary = new ManualResetEvent(false); + + var context = Create(); + context.Initialize(_resilienceContext); + ConfigureSecondaryTasks(TimeSpan.FromHours(1)); + (await LoadExecutionAsync(context)).Execution!.OnReset = (execution) => + { + execution.Outcome.Result.Should().BeOfType(); + execution.Outcome.Exception.Should().BeNull(); + assertPrimary.Set(); + }; + (await LoadExecutionAsync(context)).Execution!.OnReset = (execution) => + { + execution.Outcome.Exception.Should().BeAssignableTo(); + assertSecondary.Set(); + }; + + await context.TryWaitForCompletedExecutionAsync(HedgingStrategyOptions.InfiniteHedgingDelay); + + var pending = context.Tasks[1].ExecutionTask!; + pending.Wait(10).Should().BeFalse(); + + context.Tasks[0].AcceptOutcome(); + context.Complete(); + + await pending; + + assertPrimary.WaitOne(AssertTimeout).Should().BeTrue(); + assertSecondary.WaitOne(AssertTimeout).Should().BeTrue(); + } + + [Fact] + public async Task Complete_EnsureCleaned() + { + var context = Create(); + context.Initialize(_resilienceContext); + ConfigureSecondaryTasks(TimeSpan.Zero); + await ExecuteAllTasksAsync(context, 2); + context.Tasks[0].AcceptOutcome(); + + context.Complete(); + + context.LoadedTasks.Should().Be(0); + context.Snapshot.Context.Should().BeNull(); + + _onReset.WaitOne(AssertTimeout); + _resets.Count.Should().Be(1); + _returnedExecutions.Count.Should().Be(2); + } + + private async Task ExecuteAllTasksAsync(HedgingExecutionContext context, int count) + { + for (int i = 0; i < count; i++) + { + (await LoadExecutionAsync(context)).Loaded.Should().BeTrue(); + await context.TryWaitForCompletedExecutionAsync(HedgingStrategyOptions.InfiniteHedgingDelay); + } + } + + private async Task> LoadExecutionAsync(HedgingExecutionContext context, TimeSpan? primaryDelay = null, bool error = false) + { + return await context.LoadExecutionAsync( + async (c, _) => + { + if (primaryDelay != null) + { + await _timeProvider.Delay(primaryDelay.Value, c.CancellationToken); + } + + if (error) + { + throw new InvalidOperationException("Forced error."); + } + + return new DisposableResult { Name = "primary" }; + }, + "state"); + } + + private void ConfigureSecondaryTasks(params TimeSpan[] delays) + { + Generator = args => + { + var attempt = args.Attempt - 1; + + if (attempt >= delays.Length) + { + return null; + } + + args.Context.AddResilienceEvent(new ReportedResilienceEvent("dummy-event")); + + return async () => + { + args.Context.Properties.Set(new ResiliencePropertyKey(attempt.ToString(CultureInfo.InvariantCulture)), attempt); + await _timeProvider.Delay(delays[attempt], args.Context.CancellationToken); + return new DisposableResult(delays[attempt].ToString()); + }; + }; + } + + private HedgingActionGenerator Generator { get; set; } = args => () => Task.FromResult(new DisposableResult { Name = Handled }); + + private HedgingExecutionContext Create() + { + var handler = _hedgingHandler.CreateHandler()!; + var pool = new ObjectPool( + () => + { + var execution = new TaskExecution(handler); + _createdExecutions.Add(execution); + return execution; + }, + execution => + { + _returnedExecutions.Add(execution); + return true; + }); + + return new(pool, _timeProvider, _maxAttempts, context => + { + _resets.Add(context); + _onReset.Set(); + }); + } +} diff --git a/src/Polly.Core.Tests/Hedging/Controller/TaskExecutionTests.cs b/src/Polly.Core.Tests/Hedging/Controller/TaskExecutionTests.cs new file mode 100644 index 00000000000..cc8cfb9fba3 --- /dev/null +++ b/src/Polly.Core.Tests/Hedging/Controller/TaskExecutionTests.cs @@ -0,0 +1,269 @@ +using System; +using System.Threading.Tasks; +using Polly.Core.Tests.Helpers; +using Polly.Hedging; +using Polly.Hedging.Controller; +using Polly.Hedging.Utils; +using Polly.Strategy; + +namespace Polly.Core.Tests.Hedging.Controller; +public class TaskExecutionTests : IDisposable +{ + private const string Handled = "Handled"; + private readonly ResiliencePropertyKey _myKey = new("my-key"); + private readonly HedgingHandler _hedgingHandler; + private readonly CancellationTokenSource _cts; + private readonly HedgingTimeProvider _timeProvider; + private ContextSnapshot _snapshot; + + public TaskExecutionTests() + { + _timeProvider = new HedgingTimeProvider(); + _cts = new CancellationTokenSource(); + _hedgingHandler = new HedgingHandler(); + _hedgingHandler.SetHedging(handler => + { + handler.ShouldHandle + .HandleResult(r => r!.Name == Handled) + .HandleException(); + handler.HedgingActionGenerator = args => Generator(args); + }); + + CreateSnapshot(); + } + + public void Dispose() => _cts.Dispose(); + + [InlineData(Handled, true)] + [InlineData("Unhandled", false)] + [Theory] + public async Task Initialize_Primary_Ok(string value, bool handled) + { + var execution = Create(); + await execution.InitializeAsync(HedgedTaskType.Primary, _snapshot, + (context, state) => + { + AssertPrimaryContext(context, execution); + state.Should().Be("dummy-state"); + return new ValueTask(new DisposableResult { Name = value }); + }, + "dummy-state", + 1); + + await execution.ExecutionTask!; + ((DisposableResult)execution.Outcome.Result!).Name.Should().Be(value); + execution.IsHandled.Should().Be(handled); + AssertPrimaryContext(execution.Context, execution); + } + + [InlineData(Handled, true)] + [InlineData("Unhandled", false)] + [Theory] + public async Task Initialize_Secondary_Ok(string value, bool handled) + { + var execution = Create(); + Generator = args => + { + AssertSecondaryContext(args.Context, execution); + args.Attempt.Should().Be(4); + return () => Task.FromResult(new DisposableResult { Name = value }); + }; + + (await execution.InitializeAsync(HedgedTaskType.Secondary, _snapshot, null!, "dummy-state", 4)).Should().BeTrue(); + + await execution.ExecutionTask!; + + ((DisposableResult)execution.Outcome.Result!).Name.Should().Be(value); + execution.IsHandled.Should().Be(handled); + AssertSecondaryContext(execution.Context, execution); + } + + [Fact] + public async Task Initialize_SecondaryWhenTaskGeneratorReturnsNull_Ok() + { + var execution = Create(); + Generator = args => null; + + (await execution.InitializeAsync(HedgedTaskType.Secondary, _snapshot, null!, "dummy-state", 4)).Should().BeFalse(); + + execution.Invoking(e => e.Context).Should().Throw(); + } + + [Fact] + public async Task Cancel_Accepted_NoEffect() + { + var execution = Create(); + var cancelled = false; + + await InitializePrimaryAsync(execution, onContext: context => context.CancellationToken.Register(() => cancelled = true)); + + execution.AcceptOutcome(); + execution.Cancel(); + + cancelled.Should().BeFalse(); + } + + [Fact] + public async Task Cancel_NotAccepted_Cancelled() + { + var execution = Create(); + var cancelled = false; + + await InitializePrimaryAsync(execution, onContext: context => context.CancellationToken.Register(() => cancelled = true)); + + execution.Cancel(); + cancelled.Should().BeTrue(); + } + + [Fact] + public async Task Initialize_SecondaryWhenTaskGeneratorThrows_EnsureOutcome() + { + var execution = Create(); + Generator = args => throw new FormatException(); + + (await execution.InitializeAsync(HedgedTaskType.Secondary, _snapshot, null!, "dummy-state", 4)).Should().BeTrue(); + + await execution.ExecutionTask!; + execution.Outcome.Exception.Should().BeOfType(); + } + + [Fact] + public async Task Initialize_ExecutionTaskDoesNotThrows() + { + var execution = Create(); + Generator = args => throw new FormatException(); + + (await execution.InitializeAsync(HedgedTaskType.Secondary, _snapshot, null!, "dummy-state", 4)).Should().BeTrue(); + + await execution.ExecutionTask!.Invoking(async t => await t).Should().NotThrowAsync(); + } + + [InlineData(true)] + [InlineData(false)] + [Theory] + public async Task Initialize_Cancelled_EnsureRespected(bool primary) + { + // arrange + Generator = (args) => + { + return async () => + { + await _timeProvider.Delay(TimeSpan.FromDays(1), args.Context.CancellationToken); + return new DisposableResult { Name = Handled }; + }; + }; + + var execution = Create(); + + await execution.InitializeAsync(primary ? HedgedTaskType.Primary : HedgedTaskType.Secondary, _snapshot, + async (context, _) => + { + await _timeProvider.Delay(TimeSpan.FromDays(1), context.CancellationToken); + return new DisposableResult(); + }, + "dummy-state", + 1); + + // act + _cts.Cancel(); + await execution.ExecutionTask!; + + // assert + execution.Outcome.Exception.Should().BeAssignableTo(); + } + + [Fact] + public void AcceptOutcome_NotInitialized_Throws() + { + var execution = Create(); + + execution.Invoking(e => e.AcceptOutcome()).Should().Throw(); + } + + [InlineData(true)] + [InlineData(false)] + [Theory] + public async Task ResetAsync_Ok(bool accept) + { + // arrange + using var result = new DisposableResult(); + var execution = Create(); + var token = default(CancellationToken); + await InitializePrimaryAsync(execution, result, context => token = context.CancellationToken); + await execution.ExecutionTask!; + + if (accept) + { + execution.AcceptOutcome(); + } + + // act + await execution.ResetAsync(); + + // assert + execution.IsAccepted.Should().BeFalse(); + execution.IsHandled.Should().BeFalse(); + execution.Properties.Should().HaveCount(0); + execution.Outcome.IsValid.Should().BeFalse(); + execution.Invoking(e => e.Context).Should().Throw(); +#if !NETCOREAPP + token.Invoking(t => t.WaitHandle).Should().Throw(); +#endif + if (accept) + { + result.IsDisposed.Should().BeFalse(); + } + else + { + result.IsDisposed.Should().BeTrue(); + } + } + + private async Task InitializePrimaryAsync(TaskExecution execution, DisposableResult? result = null, Action? onContext = null) + { + await execution.InitializeAsync(HedgedTaskType.Primary, _snapshot, (context, _) => + { + onContext?.Invoke(context); + return new ValueTask(result ?? new DisposableResult { Name = Handled }); + }, "dummy-state", 1); + } + + private void AssertPrimaryContext(ResilienceContext context, TaskExecution execution) + { + context.IsInitialized.Should().BeTrue(); + context.Should().BeSameAs(_snapshot.Context); + context.Properties.Should().NotBeSameAs(_snapshot.OriginalProperties); + context.Properties.Should().BeSameAs(execution.Properties); + context.CancellationToken.Should().NotBeSameAs(_snapshot.OriginalCancellationToken); + context.CancellationToken.CanBeCanceled.Should().BeTrue(); + + context.Properties.Should().HaveCount(1); + context.Properties.TryGetValue(_myKey, out var value).Should().BeTrue(); + value.Should().Be("dummy-value"); + } + + private void AssertSecondaryContext(ResilienceContext context, TaskExecution execution) + { + context.IsInitialized.Should().BeTrue(); + context.Should().NotBeSameAs(_snapshot.Context); + context.Properties.Should().NotBeSameAs(_snapshot.OriginalProperties); + context.Properties.Should().BeSameAs(execution.Properties); + context.CancellationToken.Should().NotBeSameAs(_snapshot.OriginalCancellationToken); + context.CancellationToken.CanBeCanceled.Should().BeTrue(); + + context.Properties.Should().HaveCount(1); + context.Properties.TryGetValue(_myKey, out var value).Should().BeTrue(); + value.Should().Be("dummy-value"); + } + + private void CreateSnapshot(CancellationToken? token = null) + { + _snapshot = new ContextSnapshot(ResilienceContext.Get().Initialize(isSynchronous: false), new ResilienceProperties(), token ?? _cts.Token); + _snapshot.Context.CancellationToken = _cts.Token; + _snapshot.OriginalProperties.Set(_myKey, "dummy-value"); + } + + private HedgingActionGenerator Generator { get; set; } = args => () => Task.FromResult(new DisposableResult { Name = Handled }); + + private TaskExecution Create() => new(_hedgingHandler.CreateHandler()!); +} diff --git a/src/Polly.Core.Tests/Hedging/HedgingActionGeneratorArgumentsTests.cs b/src/Polly.Core.Tests/Hedging/HedgingActionGeneratorArgumentsTests.cs new file mode 100644 index 00000000000..69acc05ce7b --- /dev/null +++ b/src/Polly.Core.Tests/Hedging/HedgingActionGeneratorArgumentsTests.cs @@ -0,0 +1,15 @@ +using Polly.Hedging; + +namespace Polly.Core.Tests.Hedging; + +public class HedgingActionGeneratorArgumentsTests +{ + [Fact] + public void Ctor_Ok() + { + var args = new HedgingActionGeneratorArguments(ResilienceContext.Get(), 5); + + args.Context.Should().NotBeNull(); + args.Attempt.Should().Be(5); + } +} diff --git a/src/Polly.Core.Tests/Hedging/HedgingActions.cs b/src/Polly.Core.Tests/Hedging/HedgingActions.cs new file mode 100644 index 00000000000..01f409e3682 --- /dev/null +++ b/src/Polly.Core.Tests/Hedging/HedgingActions.cs @@ -0,0 +1,62 @@ +using Polly.Hedging; +using Polly.Utils; + +namespace Polly.Core.Tests.Hedging; + +internal class HedgingActions +{ + private readonly TimeProvider _timeProvider; + + public HedgingActions(TimeProvider timeProvider) + { + _timeProvider = timeProvider; + + Functions = new() + { + GetApples, + GetOranges, + GetPears + }; + + Generator = args => + { + if (args.Attempt <= Functions.Count) + { + return async () => + { + return await Functions[args.Attempt - 1]!(args.Context); + }; + } + + return null; + }; + } + + public HedgingActionGenerator Generator { get; } + + public HedgingActionGenerator EmptyFunctionsProvider { get; } = args => null; + + public List>> Functions { get; } + + private async Task GetApples(ResilienceContext context) + { + await _timeProvider.Delay(TimeSpan.FromSeconds(10), context.CancellationToken); + return "Apples"; + } + + private async Task GetPears(ResilienceContext context) + { + await _timeProvider.Delay(TimeSpan.FromSeconds(3), context.CancellationToken); + return "Pears"; + } + + private async Task GetOranges(ResilienceContext context) + { + await _timeProvider.Delay(TimeSpan.FromSeconds(2), context.CancellationToken); + return "Oranges"; + } + + public static HedgingActionGenerator GetGenerator(Func> task) => args => () => task(args.Context); + + public int MaxHedgedTasks => Functions.Count + 1; +} diff --git a/src/Polly.Core.Tests/Hedging/HedgingHandlerTests.cs b/src/Polly.Core.Tests/Hedging/HedgingHandlerTests.cs index 78e889ea54b..94e68fe213e 100644 --- a/src/Polly.Core.Tests/Hedging/HedgingHandlerTests.cs +++ b/src/Polly.Core.Tests/Hedging/HedgingHandlerTests.cs @@ -63,8 +63,8 @@ public void SetHedging_Empty_Discarded() handler.HedgingActionGenerator = args => () => Task.CompletedTask; }); - handler.IsEmpty.Should().BeTrue(); - handler.CreateHandler().Should().BeNull(); + handler.IsEmpty.Should().BeFalse(); + handler.CreateHandler().Should().NotBeNull(); } [Fact] @@ -85,11 +85,11 @@ public async Task SetHedging_Ok() handler.HandlesHedging().Should().BeTrue(); - var action = handler.TryCreateHedgedAction(ResilienceContext.Get()); + var action = handler.TryCreateHedgedAction(ResilienceContext.Get(), 0); action.Should().NotBeNull(); (await (action!()!)).Should().Be(0); - handler.TryCreateHedgedAction(ResilienceContext.Get()).Should().BeNull(); + handler.TryCreateHedgedAction(ResilienceContext.Get(), 0).Should().BeNull(); } [InlineData(true)] @@ -121,7 +121,7 @@ public async Task SetVoidHedging_Ok(bool returnsNullAction) handler.HandlesHedging().Should().BeTrue(); - var action = handler.TryCreateHedgedAction(ResilienceContext.Get()); + var action = handler.TryCreateHedgedAction(ResilienceContext.Get(), 0); if (returnsNullAction) { action.Should().BeNull(); @@ -155,6 +155,6 @@ public async Task ShouldHandleAsync_UnknownResultType_Null() var args = new HandleHedgingArguments(ResilienceContext.Get()); (await handler!.ShouldHandleAsync(new Outcome(new InvalidOperationException()), args)).Should().BeFalse(); handler.HandlesHedging().Should().BeFalse(); - handler.TryCreateHedgedAction(ResilienceContext.Get()).Should().BeNull(); + handler.TryCreateHedgedAction(ResilienceContext.Get(), 0).Should().BeNull(); } } diff --git a/src/Polly.Core.Tests/Hedging/HedgingResilienceStrategyBuilderExtensionsTests.cs b/src/Polly.Core.Tests/Hedging/HedgingResilienceStrategyBuilderExtensionsTests.cs index c4850f698ee..7a7c4b54461 100644 --- a/src/Polly.Core.Tests/Hedging/HedgingResilienceStrategyBuilderExtensionsTests.cs +++ b/src/Polly.Core.Tests/Hedging/HedgingResilienceStrategyBuilderExtensionsTests.cs @@ -51,4 +51,47 @@ public void AddHedgingT_InvalidOptions_Throws() .Throw() .WithMessage("The hedging strategy options are invalid.*"); } + + [Fact] + public async Task AddHedging_IntegrationTest() + { + ConcurrentQueue results = new(); + + var strategy = _builder + .AddHedging(new HedgingStrategyOptions + { + MaxHedgedAttempts = 4, + HedgingDelay = TimeSpan.FromMilliseconds(20), + Handler = new HedgingHandler().SetHedging(handler => + { + handler.ShouldHandle.HandleResult("error"); + handler.HedgingActionGenerator = args => + { + return async () => + { + await Task.Delay(25, args.Context.CancellationToken); + + if (args.Attempt == 3) + { + return "success"; + } + + return "error"; + }; + }; + }), + OnHedging = new Polly.Strategy.OutcomeEvent().Register((outcome, _) => results.Enqueue(outcome.Result!)) + }) + .Build(); + + var result = await strategy.ExecuteAsync(async token => + { + await Task.Delay(25, token); + return "error"; + }); + + result.Should().Be("success"); + results.Should().HaveCountGreaterThan(0); + results.Distinct().Should().ContainSingle("error"); + } } diff --git a/src/Polly.Core.Tests/Hedging/HedgingResilienceStrategyTests.cs b/src/Polly.Core.Tests/Hedging/HedgingResilienceStrategyTests.cs index a636886457b..779b6ba6f6c 100644 --- a/src/Polly.Core.Tests/Hedging/HedgingResilienceStrategyTests.cs +++ b/src/Polly.Core.Tests/Hedging/HedgingResilienceStrategyTests.cs @@ -1,10 +1,42 @@ +using System; using Polly.Hedging; +using Polly.Strategy; namespace Polly.Core.Tests.Hedging; -public class HedgingResilienceStrategyTests +public class HedgingResilienceStrategyTests : IDisposable { + private const string Success = "Success"; + + private const string Failure = "Failure"; + + private static readonly TimeSpan LongDelay = TimeSpan.FromDays(1); + private static readonly TimeSpan AssertTimeout = TimeSpan.FromSeconds(10); + private readonly HedgingStrategyOptions _options = new(); + private readonly List _events = new(); + private readonly ResilienceStrategyTelemetry _telemetry; + private readonly HedgingTimeProvider _timeProvider; + private readonly HedgingActions _actions; + private readonly PrimaryStringTasks _primaryTasks; + private readonly List _results = new(); + private readonly CancellationTokenSource _cts = new(); + + public HedgingResilienceStrategyTests() + { + _telemetry = TestUtilities.CreateResilienceTelemetry(args => _events.Add(args)); + _timeProvider = new HedgingTimeProvider { AutoAdvance = _options.HedgingDelay }; + _actions = new HedgingActions(_timeProvider); + _primaryTasks = new PrimaryStringTasks(_timeProvider); + _options.HedgingDelay = TimeSpan.FromSeconds(1); + _options.MaxHedgedAttempts = _actions.MaxHedgedTasks; + } + + public void Dispose() + { + _cts.Dispose(); + _timeProvider.Advance(TimeSpan.FromDays(365)); + } [Fact] public void Ctor_EnsureDefaults() @@ -55,5 +87,725 @@ public async Task GetHedgingDelayAsync_NoGeneratorSet_EnsureCorrectValue() result.Should().Be(TimeSpan.FromMilliseconds(123)); } - private HedgingResilienceStrategy Create() => new(_options); + [Fact] + public async Task ExecuteAsync_ShouldReturnAnyPossibleResult() + { + ConfigureHedging(); + + var strategy = Create(); + var result = await strategy.ExecuteAsync(_primaryTasks.SlowTask); + + result.Should().NotBeNull(); + _timeProvider.DelayEntries.Should().HaveCount(5); + result.Should().Be("Oranges"); + } + + [Fact] + public async void ExecuteAsync_EnsureHedgedTasksCancelled_Ok() + { + // arrange + _options.MaxHedgedAttempts = 2; + using var cancelled = new ManualResetEvent(false); + ConfigureHedging(async context => + { + try + { + await _timeProvider.Delay(LongDelay, context.CancellationToken); + } + catch (OperationCanceledException) + { + cancelled.Set(); + } + + return Failure; + }); + + var strategy = Create(); + + // act + var result = strategy.ExecuteAsync(async token => + { + await _timeProvider.Delay(TimeSpan.FromHours(1), token); + return Success; + }); + + // assert + _timeProvider.Advance(TimeSpan.FromHours(1)); + (await result).Should().Be(Success); + cancelled.WaitOne(AssertTimeout).Should().BeTrue(); + } + + [Fact] + public async Task ExecuteAsync_EnsurePrimaryTaskCancelled_Ok() + { + // arrange + using var cancelled = new ManualResetEvent(false); + ConfigureHedging(async context => + { + await _timeProvider.Delay(TimeSpan.FromHours(1), context.CancellationToken); + return Success; + }); + + var strategy = Create(); + + // act + var task = strategy.ExecuteAsync(async token => + { + try + { + await _timeProvider.Delay(TimeSpan.FromHours(24), token); + } + catch (OperationCanceledException) + { + cancelled.Set(); + } + + return Success; + }); + + // assert + _timeProvider.Advance(TimeSpan.FromHours(2)); + cancelled.WaitOne(TimeSpan.FromSeconds(1)).Should().BeTrue(); + await task; + } + + [Fact] + public async Task ExecuteAsync_EnsureDiscardedResultDisposed() + { + // arrange + using var primaryResult = new DisposableResult(); + using var secondaryResult = new DisposableResult(); + + ConfigureHedging(handler => + { + handler.HedgingActionGenerator = args => + { + return () => + { + return Task.FromResult(secondaryResult); + }; + }; + }); + + var strategy = Create(); + + // act + var result = await strategy.ExecuteAsync(async token => + { +#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods + await _timeProvider.Delay(LongDelay); +#pragma warning restore CA2016 // Forward the 'CancellationToken' parameter to methods + return primaryResult; + }); + + // assert + _timeProvider.Advance(LongDelay); + + await primaryResult.WaitForDisposalAsync(); + primaryResult.IsDisposed.Should().BeTrue(); + secondaryResult.IsDisposed.Should().BeFalse(); + } + + [Fact] + public async Task ExecuteAsync_EveryHedgedTaskShouldHaveDifferentContexts() + { + // arrange + using var cancellationSource = new CancellationTokenSource(); + var beforeKey = new ResiliencePropertyKey("before"); + var afterKey = new ResiliencePropertyKey("after"); + + var primaryContext = ResilienceContext.Get(); + primaryContext.Properties.Set(beforeKey, "before"); + var contexts = new List(); + var tokenHashCodes = new List(); + + ConfigureHedging(handler => + { + handler.HedgingActionGenerator = args => + { + return async () => + { + tokenHashCodes.Add(args.Context.CancellationToken.GetHashCode()); + args.Context.CancellationToken.CanBeCanceled.Should().BeTrue(); + args.Context.Properties.GetValue(beforeKey, "wrong").Should().Be("before"); + contexts.Add(args.Context); + await Task.Yield(); + args.Context.Properties.Set(afterKey, "after"); + return "secondary"; + }; + }; + }); + + var strategy = Create(); + + // act + var result = await strategy.ExecuteAsync( + async (context, _) => + { + context.CancellationToken.CanBeCanceled.Should().BeTrue(); + tokenHashCodes.Add(context.CancellationToken.GetHashCode()); + context.Properties.GetValue(beforeKey, "wrong").Should().Be("before"); + context.Should().Be(primaryContext); + contexts.Add(context); + await _timeProvider.Delay(LongDelay, context.CancellationToken); + return "primary"; + }, + primaryContext, + "dummy"); + + // assert + contexts.Should().HaveCountGreaterThan(1); + contexts.Count.Should().Be(contexts.Distinct().Count()); + _timeProvider.Advance(LongDelay); + tokenHashCodes.Distinct().Should().HaveCountGreaterThan(1); + } + + [Fact] + public async Task ExecuteAsync_EnsureOriginalCancellationTokenRestored() + { + // arrange + using var cancellationSource = new CancellationTokenSource(); + var primaryContext = ResilienceContext.Get(); + primaryContext.CancellationToken = cancellationSource.Token; + ConfigureHedging(TimeSpan.Zero); + var strategy = Create(); + + // act + var result = await strategy.ExecuteAsync((_, _) => Task.FromResult("primary"), primaryContext, "dummy"); + + // assert + primaryContext.CancellationToken.Should().Be(cancellationSource.Token); + } + + [InlineData(true)] + [InlineData(false)] + [Theory] + public async Task ExecuteAsync_EnsurePropertiesConsistency(bool primaryFails) + { + // arrange + _options.MaxHedgedAttempts = 2; + var attempts = _options.MaxHedgedAttempts; + var primaryContext = ResilienceContext.Get(); + var storedProps = primaryContext.Properties; + var contexts = new List(); + var primaryKey = new ResiliencePropertyKey("primary-key"); + var primaryKey2 = new ResiliencePropertyKey("primary-key-2"); + var secondaryKey = new ResiliencePropertyKey("secondary-key"); + storedProps.Set(primaryKey, "primary"); + + ConfigureHedging(args => + { + return async () => + { + contexts.Add(args.Context); + args.Context.Properties.GetValue(primaryKey, string.Empty).Should().Be("primary"); + args.Context.Properties.Set(secondaryKey, "secondary"); + await _timeProvider.Delay(TimeSpan.FromHours(1), args.Context.CancellationToken); + return primaryFails ? Success : Failure; + }; + }); + var strategy = Create(); + + // act + var executeTask = strategy.ExecuteAsync( + async (context, _) => + { + contexts.Add(context); + context.Properties.GetValue(primaryKey, string.Empty).Should().Be("primary"); + context.Properties.Set(primaryKey2, "primary-2"); + await _timeProvider.Delay(TimeSpan.FromHours(2), context.CancellationToken); + return primaryFails ? Failure : Success; + }, + primaryContext, + "state"); + + // assert + + contexts.Should().HaveCount(2); + primaryContext.Properties.Should().HaveCount(2); + primaryContext.Properties.GetValue(primaryKey, string.Empty).Should().Be("primary"); + + if (primaryFails) + { + _timeProvider.Advance(TimeSpan.FromHours(1)); + await executeTask; + primaryContext.Properties.GetValue(secondaryKey, string.Empty).Should().Be("secondary"); + } + else + { + _timeProvider.Advance(TimeSpan.FromHours(2)); + await executeTask; + primaryContext.Properties.GetValue(primaryKey2, string.Empty).Should().Be("primary-2"); + } + + primaryContext.Properties.Should().BeSameAs(storedProps); + } + + [Fact] + public async Task ExecuteAsync_Primary_CustomPropertiesAvailable() + { + // arrange + var key = new ResiliencePropertyKey("my-key"); + var key2 = new ResiliencePropertyKey("my-key-2"); + using var cancellationSource = new CancellationTokenSource(); + var primaryContext = ResilienceContext.Get(); + primaryContext.Properties.Set(key2, "my-value-2"); + primaryContext.CancellationToken = cancellationSource.Token; + var props = primaryContext.Properties; + ConfigureHedging(TimeSpan.Zero); + var strategy = Create(); + + // act + var result = await strategy.ExecuteAsync( + (context, _) => + { + primaryContext.Properties.TryGetValue(key2, out var val).Should().BeTrue(); + val.Should().Be("my-value-2"); + context.Properties.Set(key, "my-value"); + return Task.FromResult("primary"); + }, + primaryContext, "dummy"); + + // assert + primaryContext.Properties.TryGetValue(key, out var val).Should().BeTrue(); + val.Should().Be("my-value"); + primaryContext.Properties.Should().BeSameAs(props); + } + + [Fact] + public async Task ExecuteAsync_Secondary_CustomPropertiesAvailable() + { + // arrange + var key = new ResiliencePropertyKey("my-key"); + var key2 = new ResiliencePropertyKey("my-key-2"); + var primaryContext = ResilienceContext.Get(); + var storedProps = primaryContext.Properties; + primaryContext.Properties.Set(key2, "my-value-2"); + ConfigureHedging(args => + { + return () => + { + args.Context.Properties.TryGetValue(key2, out var val).Should().BeTrue(); + val.Should().Be("my-value-2"); + args.Context.Properties.Set(key, "my-value"); + return Task.FromResult(Success); + }; + }); + var strategy = Create(); + + // act + var result = await strategy.ExecuteAsync((_, _) => Task.FromResult(Failure), primaryContext, "state"); + + // assert + result.Should().Be(Success); + primaryContext.Properties.TryGetValue(key, out var val).Should().BeTrue(); + primaryContext.Properties.Should().BeSameAs(storedProps); + val.Should().Be("my-value"); + } + + [Fact] + public async Task ExecuteAsync_CancellationLinking_Ok() + { + // arrange + _options.MaxHedgedAttempts = 2; + using var primaryCancelled = new ManualResetEvent(false); + using var secondaryCancelled = new ManualResetEvent(false); + using var cancellationSource = new CancellationTokenSource(); + var context = ResilienceContext.Get(); + context.CancellationToken = cancellationSource.Token; + + ConfigureHedging(async context => + { + try + { + await _timeProvider.Delay(TimeSpan.FromDays(0.5), context.CancellationToken); + } + catch (OperationCanceledException) + { + secondaryCancelled.Set(); + throw; + } + + return Success; + }); + + var strategy = Create(); + + // act + var task = strategy.ExecuteAsync( + async (context, _) => + { + try + { + await _primaryTasks.SlowTask(context.CancellationToken); + } + catch (OperationCanceledException) + { + primaryCancelled.Set(); + throw; + } + + return Success; + }, + context, "dummy"); + + // assert + _timeProvider.Advance(TimeSpan.FromHours(4)); + cancellationSource.Cancel(); + _timeProvider.Advance(TimeSpan.FromHours(1)); + await task.Invoking(async t => await t).Should().ThrowAsync(); + + primaryCancelled.WaitOne(AssertTimeout).Should().BeTrue(); + secondaryCancelled.WaitOne(AssertTimeout).Should().BeTrue(); + } + + [Fact] + public async Task ExecuteAsync_EnsureBackgroundWorkInSuccessfulCallNotCancelled() + { + // arrange + using var cts = new CancellationTokenSource(); + List backgroundTasks = new List(); + ConfigureHedging(BackgroundWork); + var strategy = Create(); + + // act + var task = await strategy.ExecuteAsync(SlowTask, cts.Token); + + // assert + _timeProvider.Advance(TimeSpan.FromDays(2)); + + await Assert.ThrowsAsync(() => backgroundTasks[0]); + + // background task is still pending + Assert.False(backgroundTasks[1].IsCompleted); + + cts.Cancel(); + + async Task SlowTask(CancellationToken cancellationToken) + { + var delay = Task.Delay(TimeSpan.FromDays(1), cancellationToken); + backgroundTasks.Add(delay); + await delay; + return Success; + } + + Task BackgroundWork(ResilienceContext resilienceContext) + { + var delay = Task.Delay(TimeSpan.FromDays(24), resilienceContext.CancellationToken); + backgroundTasks.Add(delay); + return Task.FromResult(Success); + } + } + + [Fact] + public async void ExecuteAsync_ZeroHedgingDelay_EnsureAllTasksSpawnedAtOnce() + { + // arrange + int executions = 0; + using var allExecutionsReached = new ManualResetEvent(false); + ConfigureHedging(context => Execute(context.CancellationToken)); + _options.HedgingDelay = TimeSpan.Zero; + + // act + var task = Create().ExecuteAsync(Execute); + + // assert + Assert.True(allExecutionsReached.WaitOne(AssertTimeout)); + _timeProvider.Advance(LongDelay); + await task; + + async Task Execute(CancellationToken token) + { + if (Interlocked.Increment(ref executions) == _options.MaxHedgedAttempts) + { + allExecutionsReached.Set(); + } + + await _timeProvider.Delay(LongDelay, token); + return Success; + } + } + + [Fact] + public void ExecuteAsync_InfiniteHedgingDelay_EnsureNoConcurrentExecutions() + { + // arrange + bool executing = false; + int executions = 0; + using var allExecutions = new ManualResetEvent(true); + ConfigureHedging(context => Execute(context.CancellationToken)); + + // act + var pending = Create().ExecuteAsync(Execute, _cts.Token); + + // assert + Assert.True(allExecutions.WaitOne(AssertTimeout)); + + async Task Execute(CancellationToken token) + { + if (executing) + { + throw new InvalidOperationException("Concurrent execution detected!"); + } + + executing = true; + try + { + if (Interlocked.Increment(ref executions) == _options.MaxHedgedAttempts) + { + allExecutions.Set(); + } + + await _timeProvider.Delay(LongDelay, token); + + return "dummy"; + } + finally + { + executing = false; + } + } + } + + [Fact] + public async Task ExecuteAsync_ExceptionsHandled_ShouldThrowAnyException() + { + int attempts = 0; + + ConfigureHedging(handler => + { + handler.ShouldHandle.HandleException(); + handler.ShouldHandle.HandleException(); + handler.ShouldHandle.HandleException(); + handler.HedgingActionGenerator = args => + { + Exception exception = args.Attempt switch + { + 1 => new ArgumentException(), + 2 => new InvalidOperationException(), + 3 => new BadImageFormatException(), + _ => new NotSupportedException() + }; + + attempts++; + return () => throw exception; + }; + }); + + var strategy = Create(); + await strategy.Invoking(s => s.ExecuteAsync(_ => throw new InvalidCastException())).Should().ThrowAsync(); + attempts.Should().Be(3); + } + + [Fact] + public async Task ExecuteAsync_ExceptionsHandled_ShouldThrowLastException() + { + int attempts = 0; + + ConfigureHedging(handler => + { + handler.ShouldHandle.HandleException(); + handler.ShouldHandle.HandleException(); + handler.ShouldHandle.HandleException(); + handler.ShouldHandle.HandleException(); + handler.HedgingActionGenerator = args => + { + Exception exception = args.Attempt switch + { + 1 => new ArgumentException(), + 2 => new InvalidOperationException(), + 3 => new BadImageFormatException(), + _ => new NotSupportedException() + }; + + attempts++; + return () => throw exception; + }; + }); + + var strategy = Create(); + await strategy.Invoking(s => s.ExecuteAsync(_ => throw new InvalidCastException())).Should().ThrowAsync(); + attempts.Should().Be(3); + } + + [Fact] + public async Task ExecuteAsync_PrimaryExceptionNotHandled_Rethrow() + { + int attempts = 0; + + ConfigureHedging(handler => + { + handler.ShouldHandle.HandleException(); + handler.HedgingActionGenerator = args => + { + attempts++; + return null; + }; + }); + + var strategy = Create(); + await strategy.Invoking(s => s.ExecuteAsync(_ => throw new InvalidCastException())).Should().ThrowAsync(); + attempts.Should().Be(0); + } + + [Fact] + public async Task ExecuteAsync_ExceptionsHandled_ShouldReturnLastResult() + { + int attempts = 0; + + ConfigureHedging(handler => + { + handler.ShouldHandle.HandleException(); + handler.ShouldHandle.HandleException(); + handler.ShouldHandle.HandleException(); + handler.ShouldHandle.HandleException(); + handler.HedgingActionGenerator = args => + { + Exception? exception = args.Attempt switch + { + 1 => new ArgumentException(), + 2 => new InvalidOperationException(), + 3 => null, + _ => new NotSupportedException() + }; + + attempts++; + return () => + { + if (exception != null) + { + throw exception; + } + + return Task.FromResult(Success); + }; + }; + }); + + var strategy = Create(); + var result = await strategy.ExecuteAsync(_ => throw new InvalidCastException()); + result.Should().Be(Success); + } + + [Fact] + public async Task ExecuteAsync_EnsureHedgingDelayGeneratorRespected() + { + var delay = TimeSpan.FromMilliseconds(12345); + _options.HedgingDelayGenerator.SetGenerator(_ => TimeSpan.FromMilliseconds(12345)); + + ConfigureHedging(handler => + { + handler.HedgingActionGenerator = args => () => Task.FromResult(Success); + }); + + var strategy = Create(); + var task = strategy.ExecuteAsync(async token => + { + await _timeProvider.Delay(TimeSpan.FromDays(1), token); + throw new InvalidCastException(); + }); + + _timeProvider.Advance(TimeSpan.FromHours(5)); + (await task).Should().Be(Success); + _timeProvider.DelayEntries.Should().Contain(e => e.Delay == delay); + } + + [Fact] + public async Task ExecuteAsync_EnsureExceptionStackTracePreserved() + { + ConfigureHedging(handler => + { + handler.HedgingActionGenerator = args => null; + }); + + var strategy = Create(); + + var exception = await strategy.Invoking(s => s.ExecuteAsync(PrimaryTaskThatThrowsError)).Should().ThrowAsync(); + + exception.WithMessage("Forced Error"); + exception.And.StackTrace.Should().Contain(nameof(PrimaryTaskThatThrowsError)); + + static Task PrimaryTaskThatThrowsError(CancellationToken cancellationToken) => throw new InvalidOperationException("Forced Error"); + } + + [Fact] + public async Task ExecuteAsync_EnsureOnHedgingCalled() + { + var attempts = new List(); + _options.OnHedging.Register((o, args) => + { + o.Result.Should().Be(Failure); + attempts.Add(args.Attempt); + }); + ConfigureHedging(handler => + { + handler.ShouldHandle.HandleResult(Failure); + handler.HedgingActionGenerator = args => () => Task.FromResult(Failure); + }); + + var strategy = Create(); + await strategy.ExecuteAsync(_ => Task.FromResult(Failure)); + + attempts.Should().HaveCount(_options.MaxHedgedAttempts); + attempts.Should().BeInAscendingOrder(); + attempts[0].Should().Be(0); + } + + [Fact] + public async Task ExecuteAsync_EnsureOnHedgingTelemetry() + { + var context = ResilienceContext.Get(); + + ConfigureHedging(handler => + { + handler.ShouldHandle.HandleResult(Failure); + handler.HedgingActionGenerator = args => () => Task.FromResult(Failure); + }); + + var strategy = Create(); + await strategy.ExecuteAsync((_, _) => Task.FromResult(Failure), context, "state"); + + context.ResilienceEvents.Should().HaveCount(_options.MaxHedgedAttempts); + context.ResilienceEvents.Select(v => v.EventName).Distinct().Should().ContainSingle("OnHedging"); + } + + private void ConfigureHedging() + { + _options.OnHedging.Register((outcome, _) => + { + lock (_results) + { + _results.Add(outcome.Result!); + } + }); + + ConfigureHedging(handler => + { + handler.HedgingActionGenerator = _actions.Generator; + }); + } + + private void ConfigureHedging(Func> background) + { + ConfigureHedging(args => () => background(args.Context)); + } + + private void ConfigureHedging(HedgingActionGenerator generator) + { + ConfigureHedging(handler => + { + handler.HedgingActionGenerator = generator; + handler.ShouldHandle.HandleResult(Failure); + }); + } + + private void ConfigureHedging(Action> configure) => _options.Handler.SetHedging(configure); + + private void ConfigureHedging(TimeSpan delay) => ConfigureHedging(args => async () => + { + await Task.Delay(delay); + return "secondary"; + }); + + private HedgingResilienceStrategy Create() => new(_options, _timeProvider, _telemetry); } diff --git a/src/Polly.Core.Tests/Hedging/HedgingStrategyOptionsTResultTests.cs b/src/Polly.Core.Tests/Hedging/HedgingStrategyOptionsTResultTests.cs index 000444175a2..98e93053a7d 100644 --- a/src/Polly.Core.Tests/Hedging/HedgingStrategyOptionsTResultTests.cs +++ b/src/Polly.Core.Tests/Hedging/HedgingStrategyOptionsTResultTests.cs @@ -81,7 +81,7 @@ public async Task AsNonGenericOptions_Ok() var handler = nonGeneric.Handler.CreateHandler(); handler.Should().NotBeNull(); - (await handler!.TryCreateHedgedAction(ResilienceContext.Get())!()).Should().Be(555); + (await handler!.TryCreateHedgedAction(ResilienceContext.Get(), 0)!()).Should().Be(555); var result = await handler!.ShouldHandleAsync(new Outcome(-1), new HandleHedgingArguments(ResilienceContext.Get())); result.Should().BeTrue(); diff --git a/src/Polly.Core.Tests/Hedging/HedgingTimeProvider.cs b/src/Polly.Core.Tests/Hedging/HedgingTimeProvider.cs new file mode 100644 index 00000000000..f2bddba91ff --- /dev/null +++ b/src/Polly.Core.Tests/Hedging/HedgingTimeProvider.cs @@ -0,0 +1,55 @@ +using System; +using System.Threading.Tasks; +using Polly.Utils; + +namespace Polly.Core.Tests.Hedging; + +internal class HedgingTimeProvider : TimeProvider +{ + private DateTimeOffset _utcNow; + + public HedgingTimeProvider() + : base(Stopwatch.Frequency) => _utcNow = DateTimeOffset.UtcNow; + + public TimeSpan AutoAdvance { get; set; } + + public void Advance(TimeSpan diff) + { + _utcNow = _utcNow.Add(diff); + + foreach (var entry in DelayEntries.Where(e => e.TimeStamp <= _utcNow)) + { + entry.Complete(); + } + } + + public List DelayEntries { get; } = new List(); + + public override DateTimeOffset UtcNow => _utcNow; + + public override void CancelAfter(CancellationTokenSource source, TimeSpan delay) + { + throw new NotSupportedException(); + } + + public override Task Delay(TimeSpan delayValue, CancellationToken cancellationToken = default) + { + var entry = new DelayEntry(delayValue, new TaskCompletionSource(), _utcNow.Add(delayValue)); + cancellationToken.Register(() => entry.Source.TrySetCanceled(cancellationToken)); + DelayEntries.Add(entry); + + Advance(AutoAdvance); + + return entry.Source.Task; + } + + public override long GetTimestamp() => throw new NotSupportedException(); + + public record DelayEntry(TimeSpan Delay, TaskCompletionSource Source, DateTimeOffset TimeStamp) + { + public void Complete() + { + Source.TrySetResult(true); + } + } +} diff --git a/src/Polly.Core.Tests/Hedging/PrimaryStringTasks.cs b/src/Polly.Core.Tests/Hedging/PrimaryStringTasks.cs new file mode 100644 index 00000000000..8e86bbab11f --- /dev/null +++ b/src/Polly.Core.Tests/Hedging/PrimaryStringTasks.cs @@ -0,0 +1,33 @@ +using Polly.Utils; + +namespace Polly.Core.Tests.Hedging; + +internal class PrimaryStringTasks +{ + public const string InstantTaskResult = "Instant"; + + public const string FastTaskResult = "I am fast!"; + + public const string SlowTaskResult = "I am so slow!"; + + private readonly TimeProvider _timeProvider; + + public PrimaryStringTasks(TimeProvider timeProvider) => _timeProvider = timeProvider; + + public static Task InstantTask() + { + return Task.FromResult(InstantTaskResult); + } + + public async Task FastTask(CancellationToken token) + { + await _timeProvider.Delay(TimeSpan.FromMilliseconds(10), token); + return FastTaskResult; + } + + public async Task SlowTask(CancellationToken token) + { + await _timeProvider.Delay(TimeSpan.FromDays(1), token); + return SlowTaskResult; + } +} diff --git a/src/Polly.Core.Tests/Helpers/DisposableResult.cs b/src/Polly.Core.Tests/Helpers/DisposableResult.cs new file mode 100644 index 00000000000..df5cc1e6134 --- /dev/null +++ b/src/Polly.Core.Tests/Helpers/DisposableResult.cs @@ -0,0 +1,25 @@ +namespace Polly.Core.Tests.Helpers; + +internal sealed class DisposableResult : IDisposable +{ + public readonly TaskCompletionSource OnDisposed = new(); + + public DisposableResult() => Name = ""; + + public DisposableResult(string name) => Name = name; + + public string Name { get; set; } + + public bool IsDisposed { get; private set; } + + public Task WaitForDisposalAsync() => OnDisposed.Task; + + public void Dispose() + { + IsDisposed = true; + + OnDisposed.TrySetResult(true); + } + + public override string ToString() => Name; +} diff --git a/src/Polly.Core.Tests/ResilienceContextTests.cs b/src/Polly.Core.Tests/ResilienceContextTests.cs index b1eb4f84e17..b41597f49aa 100644 --- a/src/Polly.Core.Tests/ResilienceContextTests.cs +++ b/src/Polly.Core.Tests/ResilienceContextTests.cs @@ -81,6 +81,25 @@ public void Initialize_Typed_Ok(bool synchronous) context.ContinueOnCapturedContext.Should().BeFalse(); } + [InlineData(true)] + [InlineData(false)] + [Theory] + public void Initialize_From_Ok(bool synchronous) + { + var context = ResilienceContext.Get(); + context.Initialize(synchronous); + context.ContinueOnCapturedContext = true; + + var other = ResilienceContext.Get(); + other.InitializeFrom(context); + + other.ResultType.Should().Be(typeof(bool)); + other.IsVoid.Should().BeFalse(); + other.IsInitialized.Should().BeTrue(); + other.IsSynchronous.Should().Be(synchronous); + other.ContinueOnCapturedContext.Should().BeTrue(); + } + [InlineData(true)] [InlineData(false)] [Theory] diff --git a/src/Polly.Core.Tests/ResiliencePropertiesTests.cs b/src/Polly.Core.Tests/ResiliencePropertiesTests.cs index 7afceb6f607..347b3ee1d11 100644 --- a/src/Polly.Core.Tests/ResiliencePropertiesTests.cs +++ b/src/Polly.Core.Tests/ResiliencePropertiesTests.cs @@ -56,6 +56,36 @@ public void TryGetValue_IncorrectType_NotFound() props.TryGetValue(key2, out var val).Should().Be(false); } + [Fact] + public void Clear_Ok() + { + var key1 = new ResiliencePropertyKey("dummy"); + var key2 = new ResiliencePropertyKey("dummy"); + + var props = new ResilienceProperties(); + props.Set(key1, 12345); + props.Clear(); + + props.Should().HaveCount(0); + } + + [Fact] + public void Replace_Ok() + { + var key1 = new ResiliencePropertyKey("A"); + var key2 = new ResiliencePropertyKey("B"); + + var props = new ResilienceProperties(); + props.Set(key1, "A"); + + var otherProps = new ResilienceProperties(); + otherProps.Set(key2, "B"); + + props.Replace(otherProps); + props.Should().HaveCount(1); + props.GetValue(key2, "").Should().Be("B"); + } + [Fact] public void DictionaryOperations_Ok() { diff --git a/src/Polly.Core.Tests/Utils/DisposeHelperTests.cs b/src/Polly.Core.Tests/Utils/DisposeHelperTests.cs new file mode 100644 index 00000000000..72657964445 --- /dev/null +++ b/src/Polly.Core.Tests/Utils/DisposeHelperTests.cs @@ -0,0 +1,44 @@ +using System; +using System.Threading.Tasks; +using Moq; +using Polly.Utils; + +namespace Polly.Core.Tests.Utils; + +public class DisposeHelperTests +{ + [Fact] + public void Dispose_Object_Ok() + { + DisposeHelper.TryDisposeSafeAsync(new object()).AsTask().IsCompleted.Should().BeTrue(); + } + + [Fact] + public async Task Dispose_Disposable_Ok() + { + using var disposable = new DisposableResult(); + await DisposeHelper.TryDisposeSafeAsync(disposable); + disposable.IsDisposed.Should().BeTrue(); + } + + [Fact] + public async Task Dispose_AsyncDisposable_Ok() + { + var disposable = new Mock(); + bool disposed = false; + disposable.Setup(v => v.DisposeAsync()).Returns(default(ValueTask)).Callback(() => disposed = true); + + await DisposeHelper.TryDisposeSafeAsync(disposable.Object); + + disposed.Should().BeTrue(); + } + + [Fact] + public async Task Dispose_DisposeThrows_ExceptionHandled() + { + var disposable = new Mock(); + disposable.Setup(v => v.DisposeAsync()).Throws(new InvalidOperationException()); + + await disposable.Object.Invoking(async _ => await DisposeHelper.TryDisposeSafeAsync(disposable.Object)).Should().NotThrowAsync(); + } +} diff --git a/src/Polly.Core/Hedging/Controller/ContextSnapshot.cs b/src/Polly.Core/Hedging/Controller/ContextSnapshot.cs new file mode 100644 index 00000000000..19ee70f1bfb --- /dev/null +++ b/src/Polly.Core/Hedging/Controller/ContextSnapshot.cs @@ -0,0 +1,3 @@ +namespace Polly.Hedging.Utils; + +internal readonly record struct ContextSnapshot(ResilienceContext Context, ResilienceProperties OriginalProperties, CancellationToken OriginalCancellationToken); diff --git a/src/Polly.Core/Hedging/Controller/HedgedTaskType.cs b/src/Polly.Core/Hedging/Controller/HedgedTaskType.cs new file mode 100644 index 00000000000..dcba8143c04 --- /dev/null +++ b/src/Polly.Core/Hedging/Controller/HedgedTaskType.cs @@ -0,0 +1,7 @@ +namespace Polly.Hedging.Utils; + +internal enum HedgedTaskType +{ + Primary, + Secondary +} diff --git a/src/Polly.Core/Hedging/Controller/HedgingController.cs b/src/Polly.Core/Hedging/Controller/HedgingController.cs new file mode 100644 index 00000000000..1cab2b0cfe8 --- /dev/null +++ b/src/Polly.Core/Hedging/Controller/HedgingController.cs @@ -0,0 +1,55 @@ +using Polly.Hedging.Controller; +using Polly.Utils; + +namespace Polly.Hedging.Utils; + +internal sealed class HedgingController +{ + private readonly IObjectPool _contextPool; + private readonly IObjectPool _executionPool; + private int _rentedContexts; + private int _rentedExecutions; + + public HedgingController(TimeProvider provider, HedgingHandler.Handler handler, int maxAttempts) + { + _executionPool = new ObjectPool(() => + { + Interlocked.Increment(ref _rentedExecutions); + return new TaskExecution(handler); + }, + _ => + { + Interlocked.Decrement(ref _rentedExecutions); + + // Stryker disable once Boolean : no means to test this + return true; + }); + + _contextPool = new ObjectPool( + () => + { + Interlocked.Increment(ref _rentedContexts); + return new HedgingExecutionContext(_executionPool, provider, maxAttempts, ReturnContext); + }, + _ => + { + Interlocked.Decrement(ref _rentedContexts); + + // Stryker disable once Boolean : no means to test this + return true; + }); + } + + public int RentedContexts => _rentedContexts; + + public int RentedExecutions => _rentedExecutions; + + public HedgingExecutionContext GetContext(ResilienceContext context) + { + var executionContext = _contextPool.Get(); + executionContext.Initialize(context); + return executionContext; + } + + private void ReturnContext(HedgingExecutionContext context) => _contextPool.Return(context); +} diff --git a/src/Polly.Core/Hedging/Controller/HedgingExecutionContext.cs b/src/Polly.Core/Hedging/Controller/HedgingExecutionContext.cs new file mode 100644 index 00000000000..ec4ced86e1d --- /dev/null +++ b/src/Polly.Core/Hedging/Controller/HedgingExecutionContext.cs @@ -0,0 +1,263 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Polly.Hedging.Controller; +using Polly.Strategy; +using Polly.Utils; +using static Polly.Hedging.Utils.HedgingExecutionContext; + +namespace Polly.Hedging.Utils; + +#pragma warning disable CA1031 // Do not catch general exception types + +/// +/// The context associated with an execution of hedging resilience strategy. +/// It holds the resources for all executed hedged tasks (primary + secondary) and is responsible for resource disposal. +/// +internal sealed class HedgingExecutionContext +{ + public readonly record struct ExecutionInfo(TaskExecution? Execution, bool Loaded, Outcome? Outcome); + + private readonly List _tasks = new(); + private readonly List _executingTasks = new(); + private readonly IObjectPool _executionPool; + private readonly TimeProvider _timeProvider; + private readonly int _maxAttempts; + private readonly Action _onReset; + private readonly ResilienceProperties _replacedProperties = new(); + + public HedgingExecutionContext(IObjectPool executionPool, TimeProvider timeProvider, int maxAttempts, Action onReset) + { + _executionPool = executionPool; + _timeProvider = timeProvider; + _maxAttempts = maxAttempts; + _onReset = onReset; + } + + internal void Initialize(ResilienceContext context) + { + Snapshot = new ContextSnapshot(context, context.Properties, context.CancellationToken); + _replacedProperties.Replace(Snapshot.OriginalProperties); + Snapshot.Context.Properties = _replacedProperties; + } + + public int LoadedTasks => _tasks.Count; + + public ContextSnapshot Snapshot { get; private set; } + + public bool IsInitialized => Snapshot.Context != null; + + public IReadOnlyList Tasks => _tasks; + + private bool ContinueOnCapturedContext => Snapshot.Context.ContinueOnCapturedContext; + + public async ValueTask> LoadExecutionAsync(Func> primaryCallback, TState state) + { + if (LoadedTasks >= _maxAttempts) + { + return CreateExecutionInfoWhenNoExecution(); + } + + // determine what type of task we are creating + var type = LoadedTasks switch + { + 0 => HedgedTaskType.Primary, + _ => HedgedTaskType.Secondary + }; + + var execution = _executionPool.Get(); + + if (await execution.InitializeAsync(type, Snapshot, primaryCallback, state, LoadedTasks).ConfigureAwait(ContinueOnCapturedContext)) + { + // we were able to start a new execution, register it + _tasks.Add(execution); + _executingTasks.Add(execution); + return new ExecutionInfo(execution, true, null); + } + else + { + _executionPool.Return(execution); + return CreateExecutionInfoWhenNoExecution(); + } + } + + public void Complete() + { + if (LoadedTasks == 0) + { + throw new InvalidOperationException("The hedging must execute at least one task."); + } + + UpdateOriginalContext(); + + // first, cancel any pending tasks + foreach (var pair in _executingTasks) + { + pair.Cancel(); + } + + // We are intentionally doing the cleanup in the background as we do not want to + // delay the hedging. + // The background cleanup is safe. All exceptions are handled. + _ = CleanupInBackgroundAsync(); + } + + public async ValueTask TryWaitForCompletedExecutionAsync(TimeSpan hedgingDelay) + { + // before doing anything expensive, let's check whether any existing task is already completed + if (TryRemoveExecutedTask() is TaskExecution execution) + { + return execution; + } + + if (LoadedTasks == _maxAttempts) + { + await WaitForTaskCompetitionAsync().ConfigureAwait(ContinueOnCapturedContext); + return TryRemoveExecutedTask(); + } + + if (hedgingDelay == TimeSpan.Zero || LoadedTasks == 0) + { + // just load the next task + return null; + } + + // Stryker disable once equality : no means to test this, stryker changes '<' to '<=' where 0 is already covered in the branch above + if (hedgingDelay < TimeSpan.Zero) + { + await WaitForTaskCompetitionAsync().ConfigureAwait(ContinueOnCapturedContext); + return TryRemoveExecutedTask(); + } + + using var delayTaskCancellation = CancellationTokenSource.CreateLinkedTokenSource(Snapshot.Context.CancellationToken); + + var delayTask = _timeProvider.DelayAsync(hedgingDelay, Snapshot.Context); + Task whenAnyHedgedTask = WaitForTaskCompetitionAsync(); + var completedTask = await Task.WhenAny(whenAnyHedgedTask, delayTask).ConfigureAwait(ContinueOnCapturedContext); + + if (completedTask == delayTask) + { + return null; + } + + // cancel the ongoing delay task + // Stryker disable once boolean : no means to test this + delayTaskCancellation.Cancel(throwOnFirstException: false); + await whenAnyHedgedTask.ConfigureAwait(ContinueOnCapturedContext); + + return TryRemoveExecutedTask(); + } + + private ExecutionInfo CreateExecutionInfoWhenNoExecution() + { + // if there are no more executing tasks we need to check finished ones + if (_executingTasks.Count == 0) + { + var finishedExecution = _tasks.First(static t => t.ExecutionTask!.IsCompleted); + finishedExecution.AcceptOutcome(); + return new ExecutionInfo(null, false, finishedExecution.Outcome.AsOutcome()); + } + + return new ExecutionInfo(null, false, null); + } + + private Task WaitForTaskCompetitionAsync() + { +#pragma warning disable S109 // Magic numbers should not be used + return _executingTasks.Count switch + { + 1 => AwaitTask(_executingTasks[0], ContinueOnCapturedContext), + 2 => Task.WhenAny(_executingTasks[0].ExecutionTask!, _executingTasks[1].ExecutionTask!), + _ => Task.WhenAny(_executingTasks.Select(v => v.ExecutionTask!)) + }; +#pragma warning restore S109 // Magic numbers should not be used + + static async Task AwaitTask(TaskExecution task, bool continueOnCapturedContext) + { + // ExecutionTask never fails + await task.ExecutionTask!.ConfigureAwait(continueOnCapturedContext); + return Task.FromResult(task); + } + } + + private TaskExecution? TryRemoveExecutedTask() + { + int? index = null; + + for (var i = 0; i < _executingTasks.Count; i++) + { + var task = _executingTasks[i]; + if (task.ExecutionTask!.IsCompleted) + { + index = i; + break; + } + } + + if (index is not null) + { + var execution = _executingTasks[index.Value]; + _executingTasks.RemoveAt(index.Value); + return execution; + } + + return null; + } + + private void UpdateOriginalContext() + { + var originalContext = Snapshot.Context; + originalContext.CancellationToken = Snapshot.OriginalCancellationToken; + originalContext.Properties = Snapshot.OriginalProperties; + + int accepted = 0; + TaskExecution? acceptedExecution = null; + + foreach (var task in Tasks) + { + if (task.IsAccepted) + { + accepted++; + acceptedExecution = task; + } + } + + if (accepted != 1) + { + throw new InvalidOperationException($"There must be exactly one accepted outcome for hedging. Found {accepted}."); + } + + originalContext.Properties.Replace(acceptedExecution!.Properties); + + if (acceptedExecution.Type == HedgedTaskType.Secondary) + { + foreach (var @event in acceptedExecution.Context.ResilienceEvents) + { + originalContext.AddResilienceEvent(@event); + } + } + } + + private async Task CleanupInBackgroundAsync() + { + foreach (var task in _tasks) + { + await task.ExecutionTask!.ConfigureAwait(false); + await task.ResetAsync().ConfigureAwait(false); + _executionPool.Return(task); + } + + Reset(); + } + + private void Reset() + { + _replacedProperties.Clear(); + _tasks.Clear(); + + _executingTasks.Clear(); + Snapshot = default; + + _onReset(this); + } +} diff --git a/src/Polly.Core/Hedging/Controller/TaskExecution.cs b/src/Polly.Core/Hedging/Controller/TaskExecution.cs new file mode 100644 index 00000000000..f9b707841cb --- /dev/null +++ b/src/Polly.Core/Hedging/Controller/TaskExecution.cs @@ -0,0 +1,212 @@ +using System; +using Polly.Hedging.Utils; +using Polly.Strategy; + +namespace Polly.Hedging.Controller; + +#pragma warning disable CA1031 // Do not catch general exception types + +/// +/// Represents a single hedging attempt execution alongside all the necessary resources. These are: +/// +/// - Distinct instance for this execution. One exception are primary task where the main context is reused. +/// - The cancellation token associated with the execution. +/// +internal sealed class TaskExecution +{ + private readonly ResilienceContext _cachedContext = ResilienceContext.Get(); + private readonly HedgingHandler.Handler _handler; + private CancellationTokenSource? _cancellationSource; + private CancellationTokenRegistration? _cancellationRegistration; + private ResilienceContext? _activeContext; + + public TaskExecution(HedgingHandler.Handler handler) => _handler = handler; + + public Task? ExecutionTask { get; private set; } + + public Outcome Outcome { get; private set; } + + public bool IsHandled { get; private set; } + + public bool IsAccepted { get; private set; } + + public ResilienceProperties Properties { get; } = new(); + + public ResilienceContext Context => _activeContext ?? throw new InvalidOperationException("TaskExecution is not initialized."); + + public HedgedTaskType Type { get; set; } + + public Action? OnReset { get; set; } + + public void AcceptOutcome() + { + if (ExecutionTask?.IsCompleted == true) + { + IsAccepted = true; + } + else + { + throw new InvalidOperationException("Unable to accept outcome for a task that is not completed."); + } + } + + public void Cancel() + { + if (IsAccepted) + { + return; + } + + _cancellationSource!.Cancel(); + } + + public async ValueTask InitializeAsync( + HedgedTaskType type, + ContextSnapshot snapshot, + Func> primaryCallback, + TState state, + int attempt) + { + Type = type; + _cancellationSource = CancellationTokenSourcePool.Get(); + Properties.Replace(snapshot.OriginalProperties); + + if (snapshot.OriginalCancellationToken.CanBeCanceled) + { + _cancellationRegistration = snapshot.OriginalCancellationToken.Register(o => ((CancellationTokenSource)o!).Cancel(), _cancellationSource); + } + + PrepareContext(ref snapshot); + + if (type == HedgedTaskType.Secondary) + { + Func>? action = null; + + try + { + action = _handler.TryCreateHedgedAction(Context, attempt); + if (action == null) + { + await ResetAsync().ConfigureAwait(false); + return false; + } + } + catch (Exception e) + { + ExecutionTask = ExecuteCreateActionException(e); + return true; + } + + ExecutionTask = ExecuteSecondaryActionAsync(action); + } + else + { + ExecutionTask = ExecutePrimaryActionAsync(primaryCallback, state); + } + + return true; + } + + public async ValueTask ResetAsync() + { + OnReset?.Invoke(this); + + if (_cancellationRegistration is CancellationTokenRegistration registration) + { +#if NETCOREAPP + await registration.DisposeAsync().ConfigureAwait(false); +#else + registration.Dispose(); +#endif + } + + _cancellationRegistration = null; + + if (!IsAccepted) + { + await DisposeHelper.TryDisposeSafeAsync(Outcome.Result!).ConfigureAwait(false); + + // not accepted executions are always cancelled, so the cancellation source must be + // disposed instead of returning it to the pool + _cancellationSource!.Dispose(); + } + else + { + // accepted outcome means that the cancellation source can be be returned to the pool + // since it was most likely not cancelled + CancellationTokenSourcePool.Return(_cancellationSource!); + } + + IsAccepted = false; + Outcome = default; + IsHandled = false; + Properties.Clear(); + OnReset = null; + _activeContext = null; + _cachedContext.Reset(); + _cancellationSource = null!; + } + + private async Task ExecuteSecondaryActionAsync(Func> action) + { + Outcome outcome; + + try + { + var result = await action().ConfigureAwait(Context.ContinueOnCapturedContext); + outcome = new Outcome(result); + } + catch (Exception e) + { + outcome = new Outcome(e); + } + + await UpdateOutcomeAsync(outcome).ConfigureAwait(Context.ContinueOnCapturedContext); + } + + private async Task ExecuteCreateActionException(Exception e) + { + await UpdateOutcomeAsync(new Outcome(e)).ConfigureAwait(Context.ContinueOnCapturedContext); + } + + private async Task ExecutePrimaryActionAsync(Func> primaryCallback, TState state) + { + Outcome outcome; + + try + { + var result = await primaryCallback(Context, state).ConfigureAwait(Context.ContinueOnCapturedContext); + outcome = new Outcome(result); + } + catch (Exception e) + { + outcome = new Outcome(e); + } + + await UpdateOutcomeAsync(outcome).ConfigureAwait(Context.ContinueOnCapturedContext); + } + + private async Task UpdateOutcomeAsync(Outcome outcome) + { + Outcome = outcome.AsOutcome(); + IsHandled = await _handler.ShouldHandleAsync(outcome, new HandleHedgingArguments(Context)).ConfigureAwait(Context.ContinueOnCapturedContext); + } + + private void PrepareContext(ref ContextSnapshot snapshot) + { + if (Type == HedgedTaskType.Primary) + { + // now just replace the properties + _activeContext = snapshot.Context; + } + else + { + // secondary hedged tasks get their own unique context + _activeContext = _cachedContext; + _activeContext.InitializeFrom(snapshot.Context); + } + + _activeContext.Properties = Properties; + _activeContext.CancellationToken = _cancellationSource!.Token; + } +} diff --git a/src/Polly.Core/Hedging/HedgingActionGeneratorArguments.TResult.cs b/src/Polly.Core/Hedging/HedgingActionGeneratorArguments.TResult.cs index 3cbbaca6b81..bc115574490 100644 --- a/src/Polly.Core/Hedging/HedgingActionGeneratorArguments.TResult.cs +++ b/src/Polly.Core/Hedging/HedgingActionGeneratorArguments.TResult.cs @@ -10,9 +10,18 @@ namespace Polly.Hedging; /// The type of the result. public readonly struct HedgingActionGeneratorArguments : IResilienceArguments { - internal HedgingActionGeneratorArguments(ResilienceContext context) => Context = context; + internal HedgingActionGeneratorArguments(ResilienceContext context, int attempt) + { + Context = context; + Attempt = attempt; + } /// public ResilienceContext Context { get; } + + /// + /// Gets the zero-based hedging attempt number. + /// + public int Attempt { get; } } diff --git a/src/Polly.Core/Hedging/HedgingActionGeneratorArguments.cs b/src/Polly.Core/Hedging/HedgingActionGeneratorArguments.cs index cf1162f42ea..a046ab22657 100644 --- a/src/Polly.Core/Hedging/HedgingActionGeneratorArguments.cs +++ b/src/Polly.Core/Hedging/HedgingActionGeneratorArguments.cs @@ -9,8 +9,17 @@ namespace Polly.Hedging; /// public readonly struct HedgingActionGeneratorArguments : IResilienceArguments { - internal HedgingActionGeneratorArguments(ResilienceContext context) => Context = context; + internal HedgingActionGeneratorArguments(ResilienceContext context, int attempt) + { + Context = context; + Attempt = attempt; + } /// public ResilienceContext Context { get; } + + /// + /// Gets the zero-based hedging attempt number. + /// + public int Attempt { get; } } diff --git a/src/Polly.Core/Hedging/HedgingConstants.cs b/src/Polly.Core/Hedging/HedgingConstants.cs index 23a28fb401c..7f2365b55cf 100644 --- a/src/Polly.Core/Hedging/HedgingConstants.cs +++ b/src/Polly.Core/Hedging/HedgingConstants.cs @@ -4,6 +4,8 @@ internal static class HedgingConstants { public const string StrategyType = "Hedging"; + public const string OnHedgingEventName = "OnHedging"; + public const int DefaultMaxHedgedAttempts = 2; public const int MinimumHedgedAttempts = 2; diff --git a/src/Polly.Core/Hedging/HedgingHandler.Handler.cs b/src/Polly.Core/Hedging/HedgingHandler.Handler.cs index 56822bd7349..ad4c3beaf7d 100644 --- a/src/Polly.Core/Hedging/HedgingHandler.Handler.cs +++ b/src/Polly.Core/Hedging/HedgingHandler.Handler.cs @@ -6,27 +6,35 @@ public partial class HedgingHandler { internal sealed class Handler { - private readonly OutcomePredicate.Handler _handler; + private readonly OutcomePredicate.Handler? _predicateHandler; private readonly Dictionary _generators; - internal Handler(OutcomePredicate.Handler handler, Dictionary generators) + internal Handler(OutcomePredicate.Handler? handler, Dictionary generators) { - _handler = handler; + _predicateHandler = handler; _generators = generators; } public bool HandlesHedging() => _generators.ContainsKey(typeof(TResult)); - public ValueTask ShouldHandleAsync(Outcome outcome, HandleHedgingArguments arguments) => _handler.ShouldHandleAsync(outcome, arguments); + public ValueTask ShouldHandleAsync(Outcome outcome, HandleHedgingArguments arguments) + { + if (_predicateHandler == null) + { + return new ValueTask(false); + } + + return _predicateHandler.ShouldHandleAsync(outcome, arguments); + } - public Func>? TryCreateHedgedAction(ResilienceContext context) + public Func>? TryCreateHedgedAction(ResilienceContext context, int attempt) { if (!_generators.TryGetValue(typeof(TResult), out var generator)) { return null; } - return ((HedgingActionGenerator)generator)(new HedgingActionGeneratorArguments(context)); + return ((HedgingActionGenerator)generator)(new HedgingActionGeneratorArguments(context, attempt)); } } } diff --git a/src/Polly.Core/Hedging/HedgingHandler.cs b/src/Polly.Core/Hedging/HedgingHandler.cs index 87a6ebb0045..8f987a702b0 100644 --- a/src/Polly.Core/Hedging/HedgingHandler.cs +++ b/src/Polly.Core/Hedging/HedgingHandler.cs @@ -13,7 +13,7 @@ public sealed partial class HedgingHandler /// /// Gets a value indicating whether the hedging handler is empty. /// - public bool IsEmpty => _predicates.IsEmpty; + public bool IsEmpty => _actions.Count == 0; /// /// Configures a hedging handler for a specific result type. @@ -30,11 +30,8 @@ public HedgingHandler SetHedging(Action> config ValidationHelper.ValidateObject(handler, "The hedging handler configuration is invalid."); - if (!handler.ShouldHandle.IsEmpty) - { - _predicates.SetPredicates(handler.ShouldHandle); - _actions[typeof(TResult)] = handler.HedgingActionGenerator!; - } + _predicates.SetPredicates(handler.ShouldHandle); + _actions[typeof(TResult)] = handler.HedgingActionGenerator!; return this; } @@ -53,31 +50,27 @@ public HedgingHandler SetVoidHedging(Action configure) ValidationHelper.ValidateObject(handler, "The hedging handler configuration is invalid."); - if (!handler.ShouldHandle.IsEmpty) - { - _predicates.SetVoidPredicates(handler.ShouldHandle); - _actions[typeof(VoidResult)] = CreateGenericGenerator(handler.HedgingActionGenerator!); - } + _predicates.SetVoidPredicates(handler.ShouldHandle); + _actions[typeof(VoidResult)] = CreateGenericGenerator(handler.HedgingActionGenerator!); return this; } internal Handler? CreateHandler() { - var shouldHandle = _predicates.CreateHandler(); - if (shouldHandle == null) + if (_actions.Count == 0) { return null; } - return new Handler(shouldHandle, _actions); + return new Handler(_predicates.CreateHandler(), _actions); } private static HedgingActionGenerator CreateGenericGenerator(HedgingActionGenerator generator) { return (args) => { - Func? action = generator(new HedgingActionGeneratorArguments(args.Context)); + Func? action = generator(new HedgingActionGeneratorArguments(args.Context, args.Attempt)); if (action == null) { return null; diff --git a/src/Polly.Core/Hedging/HedgingResilienceStrategy.cs b/src/Polly.Core/Hedging/HedgingResilienceStrategy.cs index 291a0638bba..8b07cff9599 100644 --- a/src/Polly.Core/Hedging/HedgingResilienceStrategy.cs +++ b/src/Polly.Core/Hedging/HedgingResilienceStrategy.cs @@ -1,15 +1,32 @@ +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Runtime.ExceptionServices; +using System.Threading; using Polly.Hedging; +using Polly.Hedging.Utils; +using Polly.Strategy; +using Polly.Utils; namespace Polly; internal sealed class HedgingResilienceStrategy : ResilienceStrategy { - public HedgingResilienceStrategy(HedgingStrategyOptions options) + private readonly ResilienceStrategyTelemetry _telemetry; + private readonly HedgingController? _controller; + + public HedgingResilienceStrategy(HedgingStrategyOptions options, TimeProvider timeProvider, ResilienceStrategyTelemetry telemetry) { HedgingDelay = options.HedgingDelay; MaxHedgedAttempts = options.MaxHedgedAttempts; HedgingDelayGenerator = options.HedgingDelayGenerator.CreateHandler(HedgingConstants.DefaultHedgingDelay, static _ => true); HedgingHandler = options.Handler.CreateHandler(); + OnHedgingHandler = options.OnHedging.CreateHandler(); + + _telemetry = telemetry; + if (HedgingHandler != null) + { + _controller = new HedgingController(timeProvider, HedgingHandler, options.MaxHedgedAttempts); + } } public TimeSpan HedgingDelay { get; } @@ -20,9 +37,74 @@ public HedgingResilienceStrategy(HedgingStrategyOptions options) public HedgingHandler.Handler? HedgingHandler { get; } - protected internal override ValueTask ExecuteCoreAsync(Func> callback, ResilienceContext context, TState state) + public OutcomeEvent.Handler? OnHedgingHandler { get; } + + protected internal override async ValueTask ExecuteCoreAsync(Func> callback, ResilienceContext context, TState state) + { + if (_controller == null || !HedgingHandler!.HandlesHedging()) + { + return await callback(context, state).ConfigureAwait(context.ContinueOnCapturedContext); + } + + var continueOnCapturedContext = context.ContinueOnCapturedContext; + context.CancellationToken.ThrowIfCancellationRequested(); + + // create hedging execution context + var hedgingContext = _controller.GetContext(context); + + try + { + while (true) + { + if ((await hedgingContext.LoadExecutionAsync(callback, state).ConfigureAwait(context.ContinueOnCapturedContext)).Outcome is Outcome outcome) + { + return HandleOutcome(outcome); + } + + var delay = await GetHedgingDelayAsync(context, hedgingContext.LoadedTasks).ConfigureAwait(continueOnCapturedContext); + var execution = await hedgingContext.TryWaitForCompletedExecutionAsync(delay).ConfigureAwait(continueOnCapturedContext); + if (execution is null) + { + // If completedHedgedTask is null it indicates that we still do not have any finished hedged task within the hedging delay. + // We will create additional hedged task in the next iteration. + continue; + } + + outcome = execution.Outcome.AsOutcome(); + + if (!execution.IsHandled) + { + execution.AcceptOutcome(); + return HandleOutcome(outcome); + } + + var onHedgingArgs = new OnHedgingArguments(context, hedgingContext.LoadedTasks - 1); + _telemetry.Report(HedgingConstants.OnHedgingEventName, outcome, onHedgingArgs); + + if (OnHedgingHandler != null) + { + // If nothing has been returned or thrown yet, the result is a transient failure, + // and other hedged request will be awaited. + // Before it, one needs to perform the task adjacent to each hedged call. + await OnHedgingHandler.HandleAsync(outcome, onHedgingArgs).ConfigureAwait(continueOnCapturedContext); + } + } + } + finally + { + hedgingContext.Complete(); + } + } + + [ExcludeFromCodeCoverage] + private static TResult HandleOutcome(Outcome outcome) { - return callback(context, state); + if (outcome.Exception is not null) + { + ExceptionDispatchInfo.Capture(outcome.Exception).Throw(); + } + + return outcome.Result!; } internal ValueTask GetHedgingDelayAsync(ResilienceContext context, int attempt) @@ -34,5 +116,4 @@ internal ValueTask GetHedgingDelayAsync(ResilienceContext context, int return HedgingDelayGenerator(new HedgingDelayArguments(context, attempt)); } - } diff --git a/src/Polly.Core/Hedging/HedgingResilienceStrategyBuilderExtensions.cs b/src/Polly.Core/Hedging/HedgingResilienceStrategyBuilderExtensions.cs index a5565a2ab5f..fb33fd4d2bc 100644 --- a/src/Polly.Core/Hedging/HedgingResilienceStrategyBuilderExtensions.cs +++ b/src/Polly.Core/Hedging/HedgingResilienceStrategyBuilderExtensions.cs @@ -37,6 +37,6 @@ public static ResilienceStrategyBuilder AddHedging(this ResilienceStrategyBuilde ValidationHelper.ValidateObject(options, "The hedging strategy options are invalid."); - return builder.AddStrategy(context => new HedgingResilienceStrategy(options), options); + return builder.AddStrategy(context => new HedgingResilienceStrategy(options, context.TimeProvider, context.Telemetry), options); } } diff --git a/src/Polly.Core/Polly.Core.csproj b/src/Polly.Core/Polly.Core.csproj index da9fead6574..87ee858cb7d 100644 --- a/src/Polly.Core/Polly.Core.csproj +++ b/src/Polly.Core/Polly.Core.csproj @@ -23,6 +23,7 @@ + diff --git a/src/Polly.Core/ResilienceContext.cs b/src/Polly.Core/ResilienceContext.cs index ff368c74d44..02f36a94dd2 100644 --- a/src/Polly.Core/ResilienceContext.cs +++ b/src/Polly.Core/ResilienceContext.cs @@ -56,7 +56,7 @@ private ResilienceContext() /// /// Gets the custom properties attached to the context. /// - public ResilienceProperties Properties { get; } = new(); + public ResilienceProperties Properties { get; internal set; } = new(); /// /// Gets the collection of resilience events that occurred while executing the resilience strategy. @@ -76,6 +76,16 @@ private ResilienceContext() /// public static ResilienceContext Get() => Pool.Get(); + internal void InitializeFrom(ResilienceContext context) + { + ResultType = context.ResultType; + IsSynchronous = context.IsSynchronous; + CancellationToken = context.CancellationToken; + ContinueOnCapturedContext = context.ContinueOnCapturedContext; + _resilienceEvents.Clear(); + _resilienceEvents.AddRange(context.ResilienceEvents); + } + /// /// Returns a back to the pool. /// @@ -108,7 +118,7 @@ internal void AddResilienceEvent(ReportedResilienceEvent @event) _resilienceEvents.Add(@event); } - private bool Reset() + internal bool Reset() { IsSynchronous = false; ResultType = typeof(UnknownResult); diff --git a/src/Polly.Core/ResilienceProperties.cs b/src/Polly.Core/ResilienceProperties.cs index 908b25d3cb5..6e04ba7e812 100644 --- a/src/Polly.Core/ResilienceProperties.cs +++ b/src/Polly.Core/ResilienceProperties.cs @@ -58,6 +58,18 @@ public void Set(ResiliencePropertyKey key, TValue value) Options[key.Key] = value; } + internal void Replace(ResilienceProperties other) + { + Clear(); + + foreach (var pair in other.Options) + { + Options[pair.Key] = pair.Value; + } + } + + internal void Clear() => Options.Clear(); + /// object? IDictionary.this[string key] { diff --git a/src/Polly.Core/Strategy/Outcome.cs b/src/Polly.Core/Strategy/Outcome.cs index fd84fc908df..0a5d4d2e1cc 100644 --- a/src/Polly.Core/Strategy/Outcome.cs +++ b/src/Polly.Core/Strategy/Outcome.cs @@ -31,6 +31,11 @@ public Outcome(Type resultType, object? result) Result = result; } + /// + /// Gets a value indicating whether the outcome is valid. + /// + public bool IsValid => ResultType != null; + /// /// Gets the exception that occurred during the operation, if any. /// @@ -87,4 +92,14 @@ public override string ToString() return Result?.ToString() ?? string.Empty; } + + internal Outcome AsOutcome() + { + if (Exception != null) + { + return new Outcome(Exception); + } + + return new Outcome((TResult)Result!); + } } diff --git a/src/Polly.Core/Utils/DisposeHelper.cs b/src/Polly.Core/Utils/DisposeHelper.cs new file mode 100644 index 00000000000..149968616c9 --- /dev/null +++ b/src/Polly.Core/Utils/DisposeHelper.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading.Tasks; + +namespace Polly.Utils; + +#pragma warning disable CA1031 // Do not catch general exception types + +internal static class DisposeHelper +{ + public static async ValueTask TryDisposeSafeAsync(T value) + { + try + { + if (value is IAsyncDisposable asyncDisposable) + { + await asyncDisposable.DisposeAsync().ConfigureAwait(false); + } + else if (value is IDisposable disposable) + { + disposable.Dispose(); + } + } + catch (Exception e) + { + Debug.Assert(false, e.ToString()); + } + } +} diff --git a/src/Polly.Core/Utils/IObjectPool.cs b/src/Polly.Core/Utils/IObjectPool.cs new file mode 100644 index 00000000000..43cb038cb20 --- /dev/null +++ b/src/Polly.Core/Utils/IObjectPool.cs @@ -0,0 +1,23 @@ +namespace Polly.Utils; + +#pragma warning disable CA1716 // Identifiers should not match keywords + +/// +/// Represents a pool of objects. +/// +/// The type of objects in the pool. +internal interface IObjectPool + where T : class +{ + /// + /// Gets an object from the pool. + /// + /// Object instance. + T Get(); + + /// + /// Returns an object to the pool. + /// + /// The object instance to return. + void Return(T obj); +} diff --git a/src/Polly.Core/Utils/ObjectPool.cs b/src/Polly.Core/Utils/ObjectPool.cs index ac59352dacc..bb735167be4 100644 --- a/src/Polly.Core/Utils/ObjectPool.cs +++ b/src/Polly.Core/Utils/ObjectPool.cs @@ -5,12 +5,12 @@ namespace Polly.Utils; // copied from https://raw.githubusercontent.com/dotnet/aspnetcore/53124ab8cbf152f59120982f1c74e802e7970845/src/ObjectPool/src/DefaultObjectPool.cs -internal sealed class ObjectPool +internal sealed class ObjectPool : IObjectPool where T : class { internal static readonly int MaxCapacity = (Environment.ProcessorCount * 2) - 1; // the - 1 is to account for _fastItem - private readonly Func _createFunc; + private readonly Func, T> _createFunc; private readonly Func _returnFunc; private readonly ConcurrentQueue _items = new(); @@ -19,6 +19,11 @@ internal sealed class ObjectPool private int _numItems; public ObjectPool(Func createFunc, Func returnFunc) + : this(_ => createFunc(), returnFunc) + { + } + + public ObjectPool(Func, T> createFunc, Func returnFunc) { _createFunc = createFunc; _returnFunc = returnFunc; @@ -36,7 +41,7 @@ public T Get() } // no object available, so go get a brand new one - return _createFunc(); + return _createFunc(this); } return item;