Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alpha fixes and improvements #1319

Merged
merged 5 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixes and PR comments
  • Loading branch information
martintmk committed Jun 19, 2023
commit 92339656cb080bff9f2c81f54cb1b44bc436ef88
28 changes: 10 additions & 18 deletions src/Polly.Core/Hedging/Controller/HedgingExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Polly.Hedging.Utils;
/// The context associated with an execution of hedging resilience strategy.
/// It holds the resources for all executed hedged tasks (primary + secondary) and is responsible for resource disposal.
/// </summary>
internal sealed class HedgingExecutionContext<T>
internal sealed class HedgingExecutionContext<T> : IAsyncDisposable
{
public readonly record struct ExecutionInfo<TResult>(TaskExecution<T>? Execution, bool Loaded, Outcome<TResult>? Outcome);

Expand Down Expand Up @@ -81,7 +81,7 @@ public async ValueTask<ExecutionInfo<TResult>> LoadExecutionAsync<TResult, TStat
}
}

public Task CompleteAsync()
public async ValueTask DisposeAsync()
{
UpdateOriginalContext();

Expand All @@ -91,10 +91,14 @@ public Task CompleteAsync()
pair.Cancel();
}

// We are intentionally doing the cleanup in the background as we do not want to
// delay the hedging.
// The background cleanup is safe. All exceptions are handled.
return CleanupInBackgroundAsync();
foreach (var task in _tasks)
{
await task.ExecutionTaskSafe!.ConfigureAwait(false);
await task.ResetAsync().ConfigureAwait(false);
_executionPool.Return(task);
}

Reset();
}

public async ValueTask<TaskExecution<T>?> TryWaitForCompletedExecutionAsync(TimeSpan hedgingDelay)
Expand Down Expand Up @@ -226,18 +230,6 @@ private void UpdateOriginalContext()
}
}

private async Task CleanupInBackgroundAsync()
{
foreach (var task in _tasks)
{
await task.ExecutionTaskSafe!.ConfigureAwait(false);
await task.ResetAsync().ConfigureAwait(false);
_executionPool.Return(task);
}

Reset();
}

private void Reset()
{
_replacedProperties.Clear();
Expand Down
100 changes: 56 additions & 44 deletions src/Polly.Core/Hedging/HedgingResilienceStrategy.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using Polly.Hedging.Utils;
using Polly.Telemetry;

Expand Down Expand Up @@ -37,6 +39,7 @@ public HedgingResilienceStrategy(

public EventInvoker<OnHedgingArguments>? OnHedging { get; }

[ExcludeFromCodeCoverage] // coverlet issue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific issue we could link to?

Copy link
Contributor Author

@martintmk martintmk Jun 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one for example;
coverlet-coverage/coverlet#1284

There are more of those related to await handling

protected internal override async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
Expand All @@ -47,58 +50,67 @@ protected internal override async ValueTask<Outcome<TResult>> ExecuteCoreAsync<T
return await callback(context, state).ConfigureAwait(context.ContinueOnCapturedContext);
}

var continueOnCapturedContext = context.ContinueOnCapturedContext;
var cancellationToken = context.CancellationToken;

// create hedging execution context
var hedgingContext = _controller.GetContext(context);

try
{
while (true)
{
if (cancellationToken.IsCancellationRequested)
{
return new Outcome<TResult>(new OperationCanceledException(cancellationToken).TrySetStackTrace());
}

if ((await hedgingContext.LoadExecutionAsync(callback, state).ConfigureAwait(context.ContinueOnCapturedContext)).Outcome is Outcome<TResult> outcome)
{
return outcome;
}

var delay = await GetHedgingDelayAsync(context, hedgingContext.LoadedTasks).ConfigureAwait(continueOnCapturedContext);
var execution = await hedgingContext.TryWaitForCompletedExecutionAsync(delay).ConfigureAwait(continueOnCapturedContext);
if (execution is null)
{
// If completedHedgedTask is null it indicates that we still do not have any finished hedged task within the hedging delay.
// We will create additional hedged task in the next iteration.
continue;
}

outcome = execution.Outcome.AsOutcome<TResult>();

if (!execution.IsHandled)
{
execution.AcceptOutcome();
return outcome;
}

var onHedgingArgs = new OutcomeArguments<TResult, OnHedgingArguments>(context, outcome, new OnHedgingArguments(context, hedgingContext.LoadedTasks - 1));
_telemetry.Report(HedgingConstants.OnHedgingEventName, onHedgingArgs);

if (OnHedging is not null)
{
// If nothing has been returned or thrown yet, the result is a transient failure,
// and other hedged request will be awaited.
// Before it, one needs to perform the task adjacent to each hedged call.
await OnHedging.HandleAsync(onHedgingArgs).ConfigureAwait(continueOnCapturedContext);
}
}
return await ExecuteCoreAsync(hedgingContext, callback, context, state).ConfigureAwait(context.ContinueOnCapturedContext);
}
finally
{
await hedgingContext.CompleteAsync().ConfigureAwait(continueOnCapturedContext);
await hedgingContext.DisposeAsync().ConfigureAwait(context.ContinueOnCapturedContext);
}
}

private async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
HedgingExecutionContext<T> hedgingContext,
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state)
{
while (true)
{
var continueOnCapturedContext = context.ContinueOnCapturedContext;
var cancellationToken = context.CancellationToken;

if (cancellationToken.IsCancellationRequested)
{
return new Outcome<TResult>(new OperationCanceledException(cancellationToken).TrySetStackTrace());
}

if ((await hedgingContext.LoadExecutionAsync(callback, state).ConfigureAwait(context.ContinueOnCapturedContext)).Outcome is Outcome<TResult> outcome)
{
return outcome;
}

var delay = await GetHedgingDelayAsync(context, hedgingContext.LoadedTasks).ConfigureAwait(continueOnCapturedContext);
var execution = await hedgingContext.TryWaitForCompletedExecutionAsync(delay).ConfigureAwait(continueOnCapturedContext);
if (execution is null)
{
// If completedHedgedTask is null it indicates that we still do not have any finished hedged task within the hedging delay.
// We will create additional hedged task in the next iteration.
continue;
}

outcome = execution.Outcome.AsOutcome<TResult>();

if (!execution.IsHandled)
{
execution.AcceptOutcome();
return outcome;
}

var onHedgingArgs = new OutcomeArguments<TResult, OnHedgingArguments>(context, outcome, new OnHedgingArguments(context, hedgingContext.LoadedTasks - 1));
_telemetry.Report(HedgingConstants.OnHedgingEventName, onHedgingArgs);

if (OnHedging is not null)
{
// If nothing has been returned or thrown yet, the result is a transient failure,
// and other hedged request will be awaited.
// Before it, one needs to perform the task adjacent to each hedged call.
await OnHedging.HandleAsync(onHedgingArgs).ConfigureAwait(continueOnCapturedContext);
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/Polly.Core/Utils/ExceptionUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ internal static class ExceptionUtilities
{
#if !NET6_0_OR_GREATER
private static readonly FieldInfo StackTraceString = typeof(Exception).GetField("_stackTraceString", BindingFlags.NonPublic | BindingFlags.Instance)!;
private static readonly Type TraceFormat = Type.GetType("System.Diagnostics.StackTrace")!.GetNestedType("TraceFormat", BindingFlags.NonPublic)!;
private static readonly Type TraceFormat = typeof(StackTrace).GetNestedType("TraceFormat", BindingFlags.NonPublic)!;
private static readonly MethodInfo TraceToString = typeof(StackTrace).GetMethod("ToString", BindingFlags.NonPublic | BindingFlags.Instance, null, new[] { TraceFormat }, null)!;
private static readonly object[] TraceToStringArgs = new[] { Enum.GetValues(TraceFormat).GetValue(0) };
#endif

public static T TrySetStackTrace<T>(this T exception)
Expand All @@ -29,7 +30,7 @@ public static T TrySetStackTrace<T>(this T exception)
#if !NET6_0_OR_GREATER
private static void SetStackTrace(this Exception target, StackTrace stack)
{
var getStackTraceString = TraceToString.Invoke(stack, new[] { Enum.GetValues(TraceFormat).GetValue(0) });
var getStackTraceString = TraceToString.Invoke(stack, TraceToStringArgs);
StackTraceString.SetValue(target, getStackTraceString);
}
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public async Task Pooling_Ok()
controller.RentedContexts.Should().Be(2);
controller.RentedExecutions.Should().Be(2);

await context1.CompleteAsync();
await context2.CompleteAsync();
await context1.DisposeAsync();
await context2.DisposeAsync();

controller.RentedContexts.Should().Be(0);
controller.RentedExecutions.Should().Be(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public async Task Complete_EnsureOriginalContextPreparedWithAcceptedOutcome(bool
context.Tasks.First(v => v.Type == type).AcceptOutcome();

// act
await context.CompleteAsync();
await context.DisposeAsync();

// assert
_resilienceContext.Properties.Should().BeSameAs(originalProps);
Expand All @@ -331,7 +331,7 @@ public async Task Complete_NoTasks_EnsureCleaned()
var props = _resilienceContext.Properties;
var context = Create();
context.Initialize(_resilienceContext);
await context.CompleteAsync();
await context.DisposeAsync();
_resilienceContext.Properties.Should().BeSameAs(props);
}

Expand All @@ -343,7 +343,7 @@ public async Task Complete_NoAcceptedTasks_ShouldNotThrow()
ConfigureSecondaryTasks(TimeSpan.Zero);
await ExecuteAllTasksAsync(context, 2);

context.Invoking(c => c.CompleteAsync().Wait()).Should().NotThrow();
context.Invoking(c => c.DisposeAsync().AsTask().Wait()).Should().NotThrow();
}

[Fact]
Expand All @@ -356,7 +356,7 @@ public async Task Complete_MultipleAcceptedTasks_ShouldNotThrow()
context.Tasks[0].AcceptOutcome();
context.Tasks[1].AcceptOutcome();

context.Invoking(c => c.CompleteAsync().Wait()).Should().NotThrow();
context.Invoking(c => c.DisposeAsync().AsTask().Wait()).Should().NotThrow();
}

[Fact]
Expand Down Expand Up @@ -386,7 +386,7 @@ public async Task Complete_EnsurePendingTasksCleaned()
pending.Wait(10).Should().BeFalse();

context.Tasks[0].AcceptOutcome();
await context.CompleteAsync();
await context.DisposeAsync();

await pending;

Expand All @@ -403,7 +403,7 @@ public async Task Complete_EnsureCleaned()
await ExecuteAllTasksAsync(context, 2);
context.Tasks[0].AcceptOutcome();

await context.CompleteAsync();
await context.DisposeAsync();

context.LoadedTasks.Should().Be(0);
context.Snapshot.Context.Should().BeNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ public async Task Execute_CancellationRequested_Throws()

var strategy = Create();
_cts.Cancel();
var context = ResilienceContext.Get();
context.CancellationToken = _cts.Token;

await strategy.Invoking(s => s.ExecuteAsync(_ => new ValueTask<string>(Success), _cts.Token).AsTask())
.Should()
.ThrowAsync<OperationCanceledException>();
var outcome = await strategy.ExecuteOutcomeAsync((c, t) => "dummy".AsOutcomeAsync(), context, "state");
outcome.Exception.Should().BeOfType<OperationCanceledException>();
outcome.Exception!.StackTrace.Should().Contain("Execute_CancellationRequested_Throws");
}

[InlineData(-1)]
Expand Down
13 changes: 13 additions & 0 deletions test/Polly.Core.Tests/Timeout/TimeoutResilienceStrategyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ await sut
_timeProvider.VerifyAll();
}

[Fact]
public async Task Execute_Timeout_EnsureStackTrace()
{
using var cts = new CancellationTokenSource();
SetTimeout(TimeSpan.FromSeconds(2));
_timeProvider.SetupCancelAfterNow(TimeSpan.FromSeconds(2));
var sut = CreateSut();

var outcome = await sut.ExecuteOutcomeAsync(async (c, _) => { await Delay(c.CancellationToken); return "dummy".AsOutcome(); }, ResilienceContext.Get(), "state");
outcome.Exception.Should().BeOfType<TimeoutRejectedException>();
outcome.Exception!.StackTrace.Should().Contain("Execute_Timeout_EnsureStackTrace");
}

[Fact]
public async Task Execute_Cancelled_EnsureNoTimeout()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void Execute_HappyPath()
[InlineData(true, true)]
[InlineData(true, false)]
[Theory]
public void Execute_LeaseRejected(bool hasEvents, bool hasRetryAfter)
public async Task Execute_LeaseRejected(bool hasEvents, bool hasRetryAfter)
{
_diagnosticSource.Setup(v => v.IsEnabled("OnRateLimiterRejected")).Returns(true);
_diagnosticSource.Setup(v => v.Write("OnRateLimiterRejected", It.Is<object>(obj => obj != null)));
Expand Down Expand Up @@ -70,13 +70,17 @@ public void Execute_LeaseRejected(bool hasEvents, bool hasRetryAfter)
}

var strategy = Create();
var context = ResilienceContext.Get();
context.CancellationToken = cts.Token;
var outcome = await strategy.ExecuteOutcomeAsync((_, _) => new ValueTask<Outcome<string>>(new Outcome<string>("dummy")), context, "state");

var assertion = strategy
.Invoking(s => s.Execute(_ => { }, cts.Token))
outcome.Exception
.Should()
.Throw<RateLimiterRejectedException>()
.And
.RetryAfter.Should().Be((TimeSpan?)metadata);
.BeOfType<RateLimiterRejectedException>().Subject
.RetryAfter
.Should().Be((TimeSpan?)metadata);

outcome.Exception!.StackTrace.Should().Contain("Execute_LeaseRejected");

_limiter.VerifyAll();
_lease.VerifyAll();
Expand Down