Skip to content

Commit

Permalink
#1687 - Make ResilienceContextPool settable via DI (#1693)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmeyertons authored Oct 17, 2023
1 parent bb08832 commit a3ca9d0
Show file tree
Hide file tree
Showing 16 changed files with 127 additions and 38 deletions.
2 changes: 2 additions & 0 deletions src/Polly.Core/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
#nullable enable
Polly.ResiliencePipelineBuilderBase.ContextPool.get -> Polly.ResilienceContextPool?
Polly.ResiliencePipelineBuilderBase.ContextPool.set -> void
14 changes: 9 additions & 5 deletions src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,25 @@ public RegistryPipelineComponentBuilder(
_configure = configure;
}

internal PipelineComponent CreateComponent()
internal (ResilienceContextPool? contextPool, PipelineComponent component) CreateComponent()
{
var builder = CreateBuilder();
var component = builder.ComponentFactory();

if (builder.ReloadTokens.Count == 0)
{
return component;
return (builder.Instance.ContextPool, component);
}

return PipelineComponentFactory.CreateReloadable(
component = PipelineComponentFactory.CreateReloadable(
new ReloadableComponent.Entry(component, builder.ReloadTokens, builder.Telemetry),
() =>
{
var builder = CreateBuilder();
return new ReloadableComponent.Entry(builder.ComponentFactory(), builder.ReloadTokens, builder.Telemetry);
});

return (builder.Instance.ContextPool, component);
}

private Builder CreateBuilder()
Expand All @@ -69,11 +71,13 @@ private Builder CreateBuilder()
return PipelineComponentFactory.WithExecutionTracking(innerComponent, timeProvider);
},
context.ReloadTokens,
telemetry);
telemetry,
builder);
}

private record Builder(
Func<PipelineComponent> ComponentFactory,
List<CancellationToken> ReloadTokens,
ResilienceStrategyTelemetry Telemetry);
ResilienceStrategyTelemetry Telemetry,
TBuilder Instance);
}
8 changes: 5 additions & 3 deletions src/Polly.Core/Registry/ResiliencePipelineRegistry.TResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ public ResiliencePipeline<TResult> GetOrAdd(TKey key, Action<ResiliencePipelineB

return _pipelines.GetOrAdd(key, k =>
{
var component = new RegistryPipelineComponentBuilder<ResiliencePipelineBuilder<TResult>, TKey>(
var componentBuilder = new RegistryPipelineComponentBuilder<ResiliencePipelineBuilder<TResult>, TKey>(
_activator,
k,
_builderNameFormatter(k),
_instanceNameFormatter?.Invoke(k),
configure).CreateComponent();
configure);

return new ResiliencePipeline<TResult>(component, DisposeBehavior.Reject);
(var contextPool, var component) = componentBuilder.CreateComponent();

return new ResiliencePipeline<TResult>(component, DisposeBehavior.Reject, contextPool);
});
}

Expand Down
9 changes: 6 additions & 3 deletions src/Polly.Core/Registry/ResiliencePipelineRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,17 @@ public ResiliencePipeline GetOrAddPipeline(TKey key, Action<ResiliencePipelineBu

return _pipelines.GetOrAdd(key, k =>
{
var component = new RegistryPipelineComponentBuilder<ResiliencePipelineBuilder, TKey>(
var componentBuilder = new RegistryPipelineComponentBuilder<ResiliencePipelineBuilder, TKey>(
_activator,
k,
_builderNameFormatter(k),
_instanceNameFormatter?.Invoke(k),
configure).CreateComponent();
configure)
;

return new ResiliencePipeline(component, DisposeBehavior.Reject);
(var contextPool, var component) = componentBuilder.CreateComponent();

return new ResiliencePipeline(component, DisposeBehavior.Reject, contextPool);
});
}

Expand Down
7 changes: 4 additions & 3 deletions src/Polly.Core/ResiliencePipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ public sealed partial class ResiliencePipeline
/// <summary>
/// Resilience pipeline that executes the user-provided callback without any additional logic.
/// </summary>
public static readonly ResiliencePipeline Empty = new(PipelineComponent.Empty, DisposeBehavior.Ignore);
public static readonly ResiliencePipeline Empty = new(PipelineComponent.Empty, DisposeBehavior.Ignore, null);

internal ResiliencePipeline(PipelineComponent component, DisposeBehavior disposeBehavior)
internal ResiliencePipeline(PipelineComponent component, DisposeBehavior disposeBehavior, ResilienceContextPool? pool)
{
Component = component;
DisposeHelper = new ComponentDisposeHelper(component, disposeBehavior);
Pool = pool ?? ResilienceContextPool.Shared;
}

internal static ResilienceContextPool Pool => ResilienceContextPool.Shared;
internal ResilienceContextPool Pool { get; }

internal PipelineComponent Component { get; }

Expand Down
2 changes: 1 addition & 1 deletion src/Polly.Core/ResiliencePipelineBuilder.TResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ internal ResiliencePipelineBuilder(ResiliencePipelineBuilderBase other)
/// </summary>
/// <returns>An instance of <see cref="ResiliencePipeline{TResult}"/>.</returns>
/// <exception cref="ValidationException">Thrown when this builder has invalid configuration.</exception>
public ResiliencePipeline<TResult> Build() => new(BuildPipelineComponent(), DisposeBehavior.Allow);
public ResiliencePipeline<TResult> Build() => new(BuildPipelineComponent(), DisposeBehavior.Allow, ContextPool);
}
2 changes: 1 addition & 1 deletion src/Polly.Core/ResiliencePipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ public sealed class ResiliencePipelineBuilder : ResiliencePipelineBuilderBase
/// </summary>
/// <returns>An instance of <see cref="ResiliencePipeline"/>.</returns>
/// <exception cref="ValidationException">Thrown when this builder has invalid configuration.</exception>
public ResiliencePipeline Build() => new(BuildPipelineComponent(), DisposeBehavior.Allow);
public ResiliencePipeline Build() => new(BuildPipelineComponent(), DisposeBehavior.Allow, ContextPool);
}
16 changes: 14 additions & 2 deletions src/Polly.Core/ResiliencePipelineBuilderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,19 @@ private protected ResiliencePipelineBuilderBase(ResiliencePipelineBuilderBase ot
public string? InstanceName { get; set; }

/// <summary>
/// Gets or sets a <see cref="TimeProvider"/> that is used by strategies that work with time.
/// Gets or sets the <see cref="Polly.ResilienceContextPool"/> associated with the builder.
/// </summary>
/// <remarks>
/// A custom pool can be used to configure custom behavior for creation.
/// This can include setting a default <c>continueOnCapturedContext</c> parameter or custom operation key resolution.
/// </remarks>
/// <value>
/// If the default value of <see langword="null"/> is used, <see cref="ResilienceContextPool.Shared"/> will be used.
/// </value>
public ResilienceContextPool? ContextPool { get; set; }

/// <summary>
/// Gets or sets a <see cref="System.TimeProvider"/> that is used by strategies that work with time.
/// </summary>
/// <remarks>
/// This property is internal until we switch to official System.TimeProvider.
Expand All @@ -67,7 +79,7 @@ private protected ResiliencePipelineBuilderBase(ResiliencePipelineBuilderBase ot
internal TimeProvider TimeProvider { get; set; } = TimeProvider.System;

/// <summary>
/// Gets or sets the <see cref="TelemetryListener"/> that is used by Polly to report resilience events.
/// Gets or sets the <see cref="Polly.Telemetry.TelemetryListener"/> that is used by Polly to report resilience events.
/// </summary>
/// <remarks>
/// This property is used by the telemetry infrastructure and should not be used directly by user code.
Expand Down
6 changes: 3 additions & 3 deletions src/Polly.Core/ResiliencePipelineT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ public sealed partial class ResiliencePipeline<T>
/// <summary>
/// Resilience pipeline that executes the user-provided callback without any additional logic.
/// </summary>
public static readonly ResiliencePipeline<T> Empty = new(PipelineComponent.Empty, DisposeBehavior.Ignore);
public static readonly ResiliencePipeline<T> Empty = new(PipelineComponent.Empty, DisposeBehavior.Ignore, null);

internal ResiliencePipeline(PipelineComponent component, DisposeBehavior disposeBehavior)
internal ResiliencePipeline(PipelineComponent component, DisposeBehavior disposeBehavior, ResilienceContextPool? pool)
{
// instead of re-implementing individual Execute* methods we
// can just re-use the non-generic ResiliencePipeline and
// call it from Execute* methods in this class
Pipeline = new ResiliencePipeline(component, disposeBehavior);
Pipeline = new ResiliencePipeline(component, disposeBehavior, pool);
DisposeHelper = Pipeline.DisposeHelper;
}

Expand Down
2 changes: 1 addition & 1 deletion test/Polly.Core.Tests/ResiliencePipelineTTests.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public async Task ExecuteAsync_GenericStrategy_Ok(Func<ResiliencePipeline<string
c.ResultType.Should().Be(typeof(string));
c.CancellationToken.CanBeCanceled.Should().BeTrue();
},
}), DisposeBehavior.Allow);
}), DisposeBehavior.Allow, null);

await execute(pipeline);
}
Expand Down
2 changes: 1 addition & 1 deletion test/Polly.Core.Tests/ResiliencePipelineTTests.Sync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void Execute_GenericStrategy_Ok(Action<ResiliencePipeline<string>> execut
c.IsSynchronous.Should().BeTrue();
c.ResultType.Should().Be(typeof(string));
},
}), DisposeBehavior.Allow);
}), DisposeBehavior.Allow, null);

execute(pipeline);
}
Expand Down
4 changes: 2 additions & 2 deletions test/Polly.Core.Tests/ResiliencePipelineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task DisposeAsync_NullGenericPipeline_OK()
public async Task DisposeAsync_Reject_Throws()
{
var component = Substitute.For<PipelineComponent>();
var pipeline = new ResiliencePipeline(component, DisposeBehavior.Reject);
var pipeline = new ResiliencePipeline(component, DisposeBehavior.Reject, null);

(await pipeline.Invoking(p => p.DisposeHelper.DisposeAsync().AsTask())
.Should()
Expand All @@ -44,7 +44,7 @@ public async Task DisposeAsync_Reject_Throws()
public async Task DisposeAsync_Allowed_Disposed()
{
var component = Substitute.For<PipelineComponent>();
var pipeline = new ResiliencePipeline(component, DisposeBehavior.Allow);
var pipeline = new ResiliencePipeline(component, DisposeBehavior.Allow, null);
await pipeline.DisposeHelper.DisposeAsync();
await pipeline.DisposeHelper.DisposeAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void Execute_NonGeneric_Ok()
var pipeline = new ResiliencePipeline(PipelineComponentFactory.FromStrategy(new Strategy<object>(outcome =>
{
values.Add(outcome.Result);
})), DisposeBehavior.Allow);
})), DisposeBehavior.Allow, null);

pipeline.Execute(args => "dummy");
pipeline.Execute(args => 0);
Expand All @@ -41,7 +41,7 @@ public void Execute_Generic_Ok()
var pipeline = new ResiliencePipeline(PipelineComponentFactory.FromStrategy(new Strategy<string>(outcome =>
{
values.Add(outcome.Result);
})), DisposeBehavior.Allow);
})), DisposeBehavior.Allow, null);

pipeline.Execute(args => "dummy");

Expand All @@ -58,7 +58,7 @@ public void Pipeline_TypeCheck_Ok()
{
outcome.Result.Should().Be(-1);
called = true;
})), DisposeBehavior.Allow);
})), DisposeBehavior.Allow, null);

pipeline.Execute(() => -1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public async Task Create_Cancelled_EnsureNoExecution()
PipelineComponentFactory.FromStrategy(new TestResilienceStrategy()),
};

var pipeline = new ResiliencePipeline(CreateSut(strategies, new FakeTimeProvider()), DisposeBehavior.Allow);
var pipeline = new ResiliencePipeline(CreateSut(strategies, new FakeTimeProvider()), DisposeBehavior.Allow, null);
var context = ResilienceContextPool.Shared.Get();
context.CancellationToken = cancellation.Token;

Expand All @@ -104,7 +104,7 @@ public async Task Create_CancelledLater_EnsureNoExecution()
PipelineComponentFactory.FromStrategy(new TestResilienceStrategy { Before = (_, _) => { executed = true; cancellation.Cancel(); } }),
PipelineComponentFactory.FromStrategy(new TestResilienceStrategy()),
};
var pipeline = new ResiliencePipeline(CreateSut(strategies, new FakeTimeProvider()), DisposeBehavior.Allow);
var pipeline = new ResiliencePipeline(CreateSut(strategies, new FakeTimeProvider()), DisposeBehavior.Allow, null);
var context = ResilienceContextPool.Shared.Get();
context.CancellationToken = cancellation.Token;

Expand All @@ -118,7 +118,7 @@ public void ExecutePipeline_EnsureTelemetryArgumentsReported()
{
var timeProvider = new FakeTimeProvider();

var pipeline = new ResiliencePipeline(CreateSut(new[] { Substitute.For<PipelineComponent>() }, timeProvider), DisposeBehavior.Allow);
var pipeline = new ResiliencePipeline(CreateSut(new[] { Substitute.For<PipelineComponent>() }, timeProvider), DisposeBehavior.Allow, null);
pipeline.Execute(() => { timeProvider.Advance(TimeSpan.FromHours(1)); });

_listener.Events.Should().HaveCount(2);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Time.Testing;
using Microsoft.Extensions.Time.Testing;
using Polly.Utils.Pipeline;

namespace Polly.Core.Tests.Utils.Pipeline;
Expand All @@ -25,7 +23,7 @@ public async Task DisposeAsync_PendingOperations_Delayed()
};

var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow, null).Execute(() => { }));
executing.WaitOne();

var disposeTask = component.DisposeAsync().AsTask();
Expand Down Expand Up @@ -58,7 +56,7 @@ public async Task HasPendingExecutions_Ok()
};

await using var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow, null).Execute(() => { }));
executing.WaitOne();

component.HasPendingExecutions.Should().BeTrue();
Expand All @@ -84,7 +82,7 @@ public async Task DisposeAsync_Timeout_Ok()
};

var component = new ExecutionTrackingComponent(inner, _timeProvider);
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow).Execute(() => { }));
var execution = Task.Run(() => new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow, null).Execute(() => { }));
executing.WaitOne();

var disposeTask = component.DisposeAsync().AsTask();
Expand Down Expand Up @@ -115,7 +113,7 @@ public async Task DisposeAsync_WhenRunningMultipleTasks_Ok()
};

var component = new ExecutionTrackingComponent(inner, TimeProvider.System);
var pipeline = new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow);
var pipeline = new ResiliencePipeline(component, Polly.Utils.DisposeBehavior.Allow, null);

for (int i = 0; i < 10; i++)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using Microsoft.Extensions.DependencyInjection;
using Polly.Registry;

namespace Polly.Extensions.Tests.Issues;

public partial class IssuesTests
{
private class CustomResilienceContextPool : ResilienceContextPool
{
public override ResilienceContext Get(ResilienceContextCreationArguments arguments)
{
if (arguments.ContinueOnCapturedContext is null)
{
arguments = new ResilienceContextCreationArguments(arguments.OperationKey, continueOnCapturedContext: true, arguments.CancellationToken);
}

return Shared.Get(arguments);
}

public override void Return(ResilienceContext context) => Shared.Return(context);
}

private class ContextCreationTestStrategy : ResilienceStrategy
{
public int HitCount { get; private set; }

protected override async ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state)
{
context.ContinueOnCapturedContext.Should().BeTrue();

HitCount++;

return await callback(context, state);
}
}

[Fact]
public async Task DynamicContextPool_1687()
{
var pool = new CustomResilienceContextPool();
var strategy = new ContextCreationTestStrategy();
var services = new ServiceCollection();
string key = "my-key";

services.AddResiliencePipelineRegistry<string>(options => options.BuilderFactory = () => new ResiliencePipelineBuilder
{
ContextPool = pool,
});

services.AddResiliencePipeline(key, builder =>
{
builder.ContextPool.Should().Be(pool);
builder.AddStrategy(strategy);
});

// create the pipeline provider
var provider = services.BuildServiceProvider().GetRequiredService<ResiliencePipelineProvider<string>>();

var pipeline = provider.GetPipeline(key);

await pipeline.ExecuteAsync(async ct => await default(ValueTask));

strategy.HitCount.Should().BeGreaterThan(0);
}
}

0 comments on commit a3ca9d0

Please sign in to comment.