Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Polly.Core/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ Polly.PredicateBuilder<TResult>.PredicateBuilder() -> void
Polly.PredicateResult
Polly.Registry.ConfigureBuilderContext<TKey>
Polly.Registry.ConfigureBuilderContext<TKey>.EnableReloads(System.Func<System.Func<System.Threading.CancellationToken>!>! tokenProducerFactory) -> void
Polly.Registry.ConfigureBuilderContext<TKey>.OnPipelineDisposed(System.Action! callback) -> void
Polly.Registry.ConfigureBuilderContext<TKey>.PipelineKey.get -> TKey
Polly.Registry.ResiliencePipelineProvider<TKey>
Polly.Registry.ResiliencePipelineProvider<TKey>.ResiliencePipelineProvider() -> void
Expand Down
13 changes: 13 additions & 0 deletions src/Polly.Core/Registry/ConfigureBuilderContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ internal ConfigureBuilderContext(TKey strategyKey, string builderName, string? b

internal Func<Func<CancellationToken>>? ReloadTokenProducer { get; private set; }

internal List<Action> DisposeCallbacks { get; } = new();

/// <summary>
/// Enables dynamic reloading of the strategy retrieved from <see cref="ResiliencePipelineRegistry{TKey}"/>.
/// </summary>
Expand All @@ -48,4 +50,15 @@ public void EnableReloads(Func<Func<CancellationToken>> tokenProducerFactory)

ReloadTokenProducer = tokenProducerFactory;
}

/// <summary>
/// Registers a callback that is called when the pipeline instance being configured is disposed.
/// </summary>
/// <param name="callback">The callback delegate.</param>
public void OnPipelineDisposed(Action callback)
{
Guard.NotNull(callback);

DisposeCallbacks.Add(callback);
}
}
6 changes: 5 additions & 1 deletion src/Polly.Core/Registry/RegistryPipelineComponentBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,17 @@ private Builder CreateBuilder()
_configure(builder, context);

return new(
builder.BuildPipelineComponent,
() => PipelineComponentFactory.WithDisposableCallbacks(
builder.BuildPipelineComponent(),
context.DisposeCallbacks),
context.ReloadTokenProducer,
context.DisposeCallbacks,
builder.TelemetryListener);
}

private record Builder(
Func<PipelineComponent> ComponentFactory,
Func<Func<CancellationToken>>? ReloadTokenProducer,
List<Action> DisposeCallbacks,
TelemetryListener? Listener);
}
43 changes: 43 additions & 0 deletions src/Polly.Core/Utils/Pipeline/ComponentWithDisposeCallbacks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace Polly.Utils.Pipeline;

internal class ComponentWithDisposeCallbacks : PipelineComponent
{
private readonly List<Action> _callbacks;

public ComponentWithDisposeCallbacks(PipelineComponent component, List<Action> callbacks)
{
Component = component;
_callbacks = callbacks;
}

internal PipelineComponent Component { get; }

public override void Dispose()
{
ExecuteCallbacks();

Component.Dispose();
}

public override ValueTask DisposeAsync()
{
ExecuteCallbacks();

return Component.DisposeAsync();
}

internal override ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state) => Component.ExecuteCore(callback, context, state);

private void ExecuteCallbacks()
{
foreach (var callback in _callbacks)
{
callback();
}

_callbacks.Clear();
}
}
11 changes: 11 additions & 0 deletions src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using Polly.Telemetry;

namespace Polly.Utils.Pipeline;

internal static class PipelineComponentFactory
Expand All @@ -13,6 +14,16 @@ internal static class PipelineComponentFactory

public static PipelineComponent FromStrategy<T>(ResilienceStrategy<T> strategy) => new BridgeComponent<T>(strategy);

public static PipelineComponent WithDisposableCallbacks(PipelineComponent component, IEnumerable<Action> callbacks)
{
if (!callbacks.Any())
{
return component;
}

return new ComponentWithDisposeCallbacks(component, callbacks.ToList());
}

public static PipelineComponent CreateComposite(
IReadOnlyList<PipelineComponent> components,
ResilienceStrategyTelemetry telemetry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Polly.Registry;
using Polly.Utils;

namespace Polly.DependencyInjection;

Expand Down Expand Up @@ -63,4 +64,15 @@ internal AddResiliencePipelineContext(ConfigureBuilderContext<TKey> registryCont

return name == null ? monitor.CurrentValue : monitor.Get(name);
}

/// <summary>
/// Registers a callback that is called when the pipeline instance being configured is disposed.
/// </summary>
/// <param name="callback">The callback delegate.</param>
public void OnPipelineDisposed(Action callback)
{
Guard.NotNull(callback);

RegistryContext.OnPipelineDisposed(callback);
}
}
1 change: 1 addition & 0 deletions src/Polly.Extensions/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ abstract Polly.Telemetry.MeteringEnricher.Enrich<TResult, TArgs>(in Polly.Teleme
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>.EnableReloads<TOptions>(string? name = null) -> void
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>.GetOptions<TOptions>(string? name = null) -> TOptions
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>.OnPipelineDisposed(System.Action! callback) -> void
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>.PipelineKey.get -> TKey
Polly.DependencyInjection.AddResiliencePipelineContext<TKey>.ServiceProvider.get -> System.IServiceProvider!
Polly.PollyServiceCollectionExtensions
Expand Down
6 changes: 5 additions & 1 deletion src/Polly.Testing/ResiliencePipelineExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private static object GetStrategyInstance<T>(PipelineComponent component)
return component;
}

private static bool ShouldSkip(object instance) => instance is ReloadableComponent;
private static bool ShouldSkip(object instance) => instance is ReloadableComponent || instance is ComponentWithDisposeCallbacks;

private static void ExpandComponents(this PipelineComponent component, List<PipelineComponent> components)
{
Expand All @@ -78,6 +78,10 @@ private static void ExpandComponents(this PipelineComponent component, List<Pipe
components.Add(reloadable);
ExpandComponents(reloadable.Component, components);
}
else if (component is ComponentWithDisposeCallbacks callbacks)
{
ExpandComponents(callbacks.Component, components);
}
else
{
components.Add(component);
Expand Down
31 changes: 31 additions & 0 deletions test/Polly.Core.Tests/Registry/ResiliencePipelineRegistryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,37 @@ public void EnableReloads_Ok()
tries.Should().Be(retryCount + 1);
}

[Fact]
public void EnableReloads_EnsureDisposedCallbackCalled()
{
// arrange
var registry = new ResiliencePipelineRegistry<string>();
using var changeSource = new CancellationTokenSource();
var disposedCalls = 0;

registry.TryAddBuilder("dummy", (builder, context) =>
{
// this call enables dynamic reloads for the dummy strategy
context.EnableReloads(() => () => changeSource.Token);
context.OnPipelineDisposed(() => disposedCalls++);
builder.AddTimeout(TimeSpan.FromSeconds(1));
});

// act
var strategy = registry.GetPipeline("dummy");

// assert
disposedCalls.Should().Be(0);
strategy.Execute(() => { });

changeSource.Cancel();
disposedCalls.Should().Be(1);
strategy.Execute(() => { });

registry.Dispose();
disposedCalls.Should().Be(2);
}

[Fact]
public void EnableReloads_Generic_Ok()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using NSubstitute;
using Polly.Utils.Pipeline;

namespace Polly.Core.Tests.Utils.Pipeline;

public class ComponentWithDisposeCallbacksTests
{
[InlineData(true)]
[InlineData(false)]
[Theory]
public async Task Dispose_Ok(bool isAsync)
{
// Arrange
var called1 = 0;
var called2 = 0;

var callbacks = new List<Action>
{
() => called1++,
() => called2++
};
var component = Substitute.For<PipelineComponent>();
var sut = new ComponentWithDisposeCallbacks(component, callbacks);

// Act
if (isAsync)
{
await sut.DisposeAsync();
await sut.DisposeAsync();
await component.Received(2).DisposeAsync();
}
else
{
sut.Dispose();
#pragma warning disable S3966 // Objects should not be disposed more than once
sut.Dispose();
#pragma warning restore S3966 // Objects should not be disposed more than once
component.Received(2).Dispose();
}

// Assert
callbacks.Should().BeEmpty();
called1.Should().Be(1);
called2.Should().Be(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using NSubstitute;
using Polly.Utils.Pipeline;

namespace Polly.Core.Tests.Utils.Pipeline;

public class PipelineComponentFactoryTests
{
[Fact]
public void WithDisposableCallbacks_NoCallbacks_ReturnsOriginalComponent()
{
var component = Substitute.For<PipelineComponent>();
var result = PipelineComponentFactory.WithDisposableCallbacks(component, new List<Action>());
result.Should().BeSameAs(component);
}

[Fact]
public void PipelineComponentFactory_Should_Return_WrapperComponent_With_Callbacks()
{
var component = Substitute.For<PipelineComponent>();
var callbacks = new List<Action> { () => { } };

var result = PipelineComponentFactory.WithDisposableCallbacks(component, callbacks);

result.Should().BeOfType<ComponentWithDisposeCallbacks>();
}
}
58 changes: 58 additions & 0 deletions test/Polly.Extensions.Tests/DisposablePipelineTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System.Threading.RateLimiting;
using Microsoft.Extensions.DependencyInjection;
using Polly.RateLimiting;
using Polly.Registry;

namespace Polly.Extensions.Tests;

public class DisposablePipelineTests
{
[Fact]
public void DisposePipeline_EnsureLinkedResourcesDisposedToo()
{
var limiters = new List<RateLimiter>();

var provider = new ServiceCollection()
.AddResiliencePipeline("my-pipeline", (builder, context) =>
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions
{
PermitLimit = 1,
QueueLimit = 1
});
limiters.Add(limiter);

builder.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = args => limiter.AcquireAsync(1, args.Context.CancellationToken)
});

// when the pipeline instance is disposed, limiter is disposed too
context.OnPipelineDisposed(() => limiter.Dispose());
})
.BuildServiceProvider();

limiters.Should().HaveCount(0);
provider.GetRequiredService<ResiliencePipelineProvider<string>>().GetPipeline("my-pipeline");
provider.GetRequiredService<ResiliencePipelineProvider<string>>().GetPipeline("my-pipeline");
limiters.Should().HaveCount(1);
IsDisposed(limiters[0]).Should().BeFalse();

provider.Dispose();
limiters.Should().HaveCount(1);
IsDisposed(limiters[0]).Should().BeTrue();
}

private static bool IsDisposed(RateLimiter limiter)
{
try
{
limiter.AcquireAsync(1).AsTask().GetAwaiter().GetResult();
return false;
}
catch (ObjectDisposedException)
{
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void GetPipelineDescriptor_Reloadable_Ok()
var strategy = registry.GetOrAddPipeline("dummy", (builder, context) =>
{
context.EnableReloads(() => () => CancellationToken.None);

context.OnPipelineDisposed(() => { });
builder
.AddConcurrencyLimiter(10)
.AddStrategy(_ => new CustomStrategy(), new TestOptions());
Expand Down