Skip to content

Commit 7a7811a

Browse files
committed
Fix #7534
1 parent 1efdfa0 commit 7a7811a

File tree

1 file changed

+10
-2
lines changed
  • src/core/Akka.Streams/Implementation/Fusing

1 file changed

+10
-2
lines changed

src/core/Akka.Streams/Implementation/Fusing/Ops.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2547,6 +2547,8 @@ private sealed class Logic : InAndOutGraphStageLogic
25472547
{
25482548
private sealed class Holder<T>(object? message, Result<T> element)
25492549
{
2550+
private Directive? _cachedDirective = null;
2551+
25502552
public object? Message { get; } = message;
25512553

25522554
public Result<T> Element { get; private set; } = element;
@@ -2557,6 +2559,12 @@ public void SetElement(Result<T> result)
25572559
? Result.Failure<T>(ReactiveStreamsCompliance.ElementMustNotBeNullException)
25582560
: result;
25592561
}
2562+
2563+
public Directive SupervisionDirectiveFor(Decider decider, Exception ex)
2564+
{
2565+
_cachedDirective ??= decider(ex);
2566+
return _cachedDirective.Value;
2567+
}
25602568
}
25612569

25622570
private static readonly Result<TOut> NotYetThere = Result.Failure<TOut>(new Exception());
@@ -2673,7 +2681,7 @@ private void PushOne()
26732681
{
26742682
// this could happen if we are looping in PushOne and end up on a failed Task before the
26752683
// HolderCompleted callback has run
2676-
var strategy = _decider(result.Exception);
2684+
var strategy = holder.SupervisionDirectiveFor(_decider, result.Exception!);
26772685
Log.Error(result.Exception, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", holder.Message, strategy);
26782686
switch (strategy)
26792687
{
@@ -2720,7 +2728,7 @@ private void HolderCompleted(Holder<TOut> holder, Result<TOut> result)
27202728
}
27212729

27222730
var exception = result.Exception;
2723-
var strategy = _decider(exception);
2731+
var strategy = holder.SupervisionDirectiveFor(_decider, exception!);
27242732
Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy);
27252733
switch (strategy)
27262734
{

0 commit comments

Comments
 (0)