Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions src/Polly.Core/Hedging/HedgingResilienceStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ private async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
ResilienceContext context,
TState state)
{
var attempt = -1;
while (true)
{
attempt++;
var continueOnCapturedContext = context.ContinueOnCapturedContext;
var cancellationToken = context.CancellationToken;

Expand All @@ -79,7 +81,9 @@ private async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
return new Outcome<TResult>(new OperationCanceledException(cancellationToken).TrySetStackTrace());
}

if ((await hedgingContext.LoadExecutionAsync(callback, state).ConfigureAwait(context.ContinueOnCapturedContext)).Outcome is Outcome<TResult> outcome)
var loadedExecution = await hedgingContext.LoadExecutionAsync(callback, state).ConfigureAwait(context.ContinueOnCapturedContext);

if (loadedExecution.Outcome is Outcome<TResult> outcome)
{
return outcome;
}
Expand All @@ -90,6 +94,10 @@ private async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
{
// If completedHedgedTask is null it indicates that we still do not have any finished hedged task within the hedging delay.
// We will create additional hedged task in the next iteration.
await HandleOnHedgingAsync(
context,
new Outcome<TResult>(default(TResult)),
new OnHedgingArguments(attempt, HasOutcome: false)).ConfigureAwait(context.ContinueOnCapturedContext);
continue;
}

Expand All @@ -101,16 +109,28 @@ private async ValueTask<Outcome<TResult>> ExecuteCoreAsync<TResult, TState>(
return outcome;
}

var onHedgingArgs = new OutcomeArguments<TResult, OnHedgingArguments>(context, outcome, new OnHedgingArguments(context, hedgingContext.LoadedTasks - 1));
_telemetry.Report(HedgingConstants.OnHedgingEventName, onHedgingArgs);
await HandleOnHedgingAsync(
context,
outcome,
new OnHedgingArguments(attempt, HasOutcome: true)).ConfigureAwait(context.ContinueOnCapturedContext);
}
}

if (OnHedging is not null)
{
// If nothing has been returned or thrown yet, the result is a transient failure,
// and other hedged request will be awaited.
// Before it, one needs to perform the task adjacent to each hedged call.
await OnHedging.HandleAsync(onHedgingArgs).ConfigureAwait(continueOnCapturedContext);
}
private async ValueTask HandleOnHedgingAsync<TResult>(ResilienceContext context, Outcome<TResult> outcome, OnHedgingArguments args)
{
var onHedgingArgs = new OutcomeArguments<TResult, OnHedgingArguments>(
context,
outcome,
args);

_telemetry.Report(HedgingConstants.OnHedgingEventName, onHedgingArgs);

if (OnHedging is not null)
{
// If nothing has been returned or thrown yet, the result is a transient failure,
// and other hedged request will be awaited.
// Before it, one needs to perform the task adjacent to each hedged call.
await OnHedging.HandleAsync(onHedgingArgs).ConfigureAwait(context.ContinueOnCapturedContext);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/Polly.Core/Hedging/HedgingStrategyOptions.TResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public class HedgingStrategyOptions<TResult> : ResilienceStrategyOptions
/// </summary>
/// <remarks>
/// Defaults to <see langword="null"/>.
/// <para>
/// The hedging is executed when the current attempt outcome is not successful and the <see cref="ShouldHandle"/> predicate returns <see langword="true"/> or when
/// the current attempt did not finish within the <see cref="HedgingDelay"/>.
/// </para>
/// </remarks>
public Func<OutcomeArguments<TResult, OnHedgingArguments>, ValueTask>? OnHedging { get; set; }
}
7 changes: 5 additions & 2 deletions src/Polly.Core/Hedging/OnHedgingArguments.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ namespace Polly.Hedging;
/// <summary>
/// Represents arguments used by the on-hedging event.
/// </summary>
/// <param name="Context">The context associated with the execution of a user-provided callback.</param>
/// <param name="Attempt">The zero-based hedging attempt number.</param>
public record OnHedgingArguments(ResilienceContext Context, int Attempt);
/// <param name="HasOutcome">
/// Determines whether the outcome is available before loading the next hedged task.
/// No outcome indicates that the previous action did not finish within the hedging delay.
/// </param>
public record OnHedgingArguments(int Attempt, bool HasOutcome);
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void AddHedgingT_InvalidOptions_Throws()
[Fact]
public async Task AddHedging_IntegrationTest()
{
var hedgingWithoutOutcome = false;
ConcurrentQueue<string> results = new();

var strategy = _builder
Expand All @@ -78,7 +79,19 @@ public async Task AddHedging_IntegrationTest()
return "error".AsOutcome().AsOutcome();
};
},
OnHedging = args => { results.Enqueue(args.Result!.ToString()!); return default; }
OnHedging = args =>
{
if (args.Arguments.HasOutcome)
{
results.Enqueue(args.Result!.ToString()!);
}
else
{
hedgingWithoutOutcome = true;
}

return default;
}
})
.Build();

Expand All @@ -91,5 +104,6 @@ public async Task AddHedging_IntegrationTest()
result.Should().Be("success");
results.Should().HaveCountGreaterThan(0);
results.Distinct().Should().ContainSingle("error");
hedgingWithoutOutcome.Should().BeTrue();
}
}
30 changes: 30 additions & 0 deletions test/Polly.Core.Tests/Hedging/HedgingResilienceStrategyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,35 @@ public async Task ExecuteAsync_EnsurePrimaryTaskCancelled_Ok()
await task;
}

[Fact]
public async Task ExecuteAsync_EnsureSecondaryHedgedTaskReportedWithNoOutcome()
{
// arrange
using var cancelled = new ManualResetEvent(false);
var hasOutcome = true;
_options.OnHedging = args =>
{
hasOutcome = args.Arguments.HasOutcome;
return default;
};

ConfigureHedging(context => Success.AsOutcomeAsync());

var strategy = Create();

// act
var task = strategy.ExecuteAsync(async token =>
{
await _timeProvider.Delay(TimeSpan.FromHours(24), token);
return Success;
});

// assert
_timeProvider.Advance(TimeSpan.FromHours(2));
hasOutcome.Should().BeFalse();
await task;
}

[Fact]
public async Task ExecuteAsync_EnsureDiscardedResultDisposed()
{
Expand Down Expand Up @@ -814,6 +843,7 @@ public async Task ExecuteAsync_EnsureOnHedgingCalled()
var attempts = new List<int>();
_options.OnHedging = args =>
{
args.Arguments.HasOutcome.Should().BeTrue();
args.Result.Should().Be(Failure);
attempts.Add(args.Arguments.Attempt);
return default;
Expand Down