Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4430,7 +4430,12 @@ namespace Akka.Streams.Implementation.Fusing
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SelectAsyncUnordered<TIn, TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
0,
1,
1,
1})]
public sealed class SelectAsyncUnordered<[System.Runtime.CompilerServices.NullableAttribute(2)] TIn, [System.Runtime.CompilerServices.NullableAttribute(2)] TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
{
public readonly Akka.Streams.Inlet<TIn> In;
public readonly Akka.Streams.Outlet<TOut> Out;
Expand All @@ -4440,7 +4445,12 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SelectAsync<TIn, TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
0,
1,
1,
1})]
public sealed class SelectAsync<[System.Runtime.CompilerServices.NullableAttribute(2)] TIn, [System.Runtime.CompilerServices.NullableAttribute(2)] TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
{
public readonly Akka.Streams.Inlet<TIn> In;
public readonly Akka.Streams.Outlet<TOut> Out;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4404,7 +4404,12 @@ namespace Akka.Streams.Implementation.Fusing
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SelectAsyncUnordered<TIn, TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
0,
1,
1,
1})]
public sealed class SelectAsyncUnordered<[System.Runtime.CompilerServices.NullableAttribute(2)] TIn, [System.Runtime.CompilerServices.NullableAttribute(2)] TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
{
public readonly Akka.Streams.Inlet<TIn> In;
public readonly Akka.Streams.Outlet<TOut> Out;
Expand All @@ -4414,7 +4419,12 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SelectAsync<TIn, TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
0,
1,
1,
1})]
public sealed class SelectAsync<[System.Runtime.CompilerServices.NullableAttribute(2)] TIn, [System.Runtime.CompilerServices.NullableAttribute(2)] TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
{
public readonly Akka.Streams.Inlet<TIn> In;
public readonly Akka.Streams.Outlet<TOut> Out;
Expand Down
8 changes: 3 additions & 5 deletions src/core/Akka.Streams.Tests/Dsl/FlowAskSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,7 @@ public async Task Flow_with_ask_must_signal_ask_timeout_failure() => await this.

c.ExpectSubscription().Request(10);
var error = c.ExpectError();
error.As<AggregateException>().Flatten()
.InnerException
.Should().BeOfType<AskTimeoutException>();
error.Should().BeOfType<AskTimeoutException>();
return Task.CompletedTask;
}, _materializer);

Expand All @@ -253,8 +251,8 @@ public async Task Flow_with_ask_must_signal_ask_failure() => await this.AssertAl
.Ask<Reply>(failsOn, _timeout, 1)
.RunWith(Sink.FromSubscriber(c), _materializer);

var error = (AggregateException)c.ExpectSubscriptionAndError();
error.InnerException.Message.Should().Be("Booming for 1!");
var error = c.ExpectSubscriptionAndError();
error.Message.Should().Be("Booming for 1!");
return Task.CompletedTask;
}, _materializer);

Expand Down
50 changes: 26 additions & 24 deletions src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ await this.AssertAllStagesStoppedAsync(async() => {

var exception = await probe.AsyncBuilder()
.Request(10)
.ExpectNextN(new[]{1, 2})
.ExpectNextN([1, 2])
.ExpectErrorAsync()
.ShouldCompleteWithin(RemainingOrDefault);
exception.InnerException!.Message.Should().Be("err1");
exception.Message.Should().Be("err1");
}, Materializer);
}

Expand Down Expand Up @@ -232,7 +232,7 @@ await this.AssertAllStagesStoppedAsync(async () => {
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().Message.Should().Be("err2");
(await c.ExpectErrorAsync()).Message.Should().Be("err2");
}, Materializer);
}

Expand All @@ -258,7 +258,7 @@ await this.AssertAllStagesStoppedAsync(async () =>

await probe.AsyncBuilder()
.Request(10)
.ExpectNextN(new[] { 1, 2 })
.ExpectNextN([1, 2])
.ExpectErrorAsync();

invoked.Should().BeTrue();
Expand Down Expand Up @@ -358,21 +358,21 @@ public async Task A_Flow_with_SelectAsync_must_signal_NPE_when_task_is_completed
{
var c = this.CreateManualSubscriberProbe<string>();

Source.From(new[] {"a", "b"})
.SelectAsync(4, _ => Task.FromResult(null as string))
Source.From(["a", "b"])
.SelectAsync(4, _ => Task.FromResult<string>(null))
.To(Sink.FromSubscriber(c)).Run(Materializer);

var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().Message.Should().StartWith(ReactiveStreamsCompliance.ElementMustNotBeNullMsg);
(await c.ExpectErrorAsync()).Message.Should().StartWith(ReactiveStreamsCompliance.ElementMustNotBeNullMsg);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_resume_when_task_is_completed_with_null()
{
var c = this.CreateManualSubscriberProbe<string>();
Source.From(new[] { "a", "b", "c" })
.SelectAsync(4, s => s.Equals("b") ? Task.FromResult(null as string) : Task.FromResult(s))
Source.From(["a", "b", "c"])
.SelectAsync(4, s => s.Equals("b") ? Task.FromResult<string>(null) : Task.FromResult(s))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = await c.ExpectSubscriptionAsync();
Expand Down Expand Up @@ -438,21 +438,6 @@ await this.AssertAllStagesStoppedAsync(async() =>
}, cancellation.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

Task<int> Deferred()
{
var promise = new TaskCompletionSource<int>();
if (counter.IncrementAndGet() > parallelism)
promise.SetException(new Exception("parallelism exceeded"));
else
{
var wrote = queue.Writer.TryWrite((promise, DateTime.Now.Ticks));
if (!wrote)
promise.SetException(new Exception("Failed to write to queue"));
}

return promise.Task;
}

try
{
const int n = 10000;
Expand All @@ -467,6 +452,23 @@ Task<int> Deferred()
{
cancellation.Cancel(false);
}

return;

Task<int> Deferred()
{
var promise = new TaskCompletionSource<int>();
if (counter.IncrementAndGet() > parallelism)
promise.SetException(new Exception("parallelism exceeded"));
else
{
var wrote = queue.Writer.TryWrite((promise, DateTime.Now.Ticks));
if (!wrote)
promise.SetException(new Exception("Failed to write to queue"));
}

return promise.Task;
}
}, Materializer);
}

Expand Down
78 changes: 40 additions & 38 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using Akka.Streams.Util;
using Akka.Util;
using Akka.Util.Internal;
using Debug = System.Diagnostics.Debug;
using Decider = Akka.Streams.Supervision.Decider;
using Directive = Akka.Streams.Supervision.Directive;

Expand Down Expand Up @@ -2512,61 +2513,47 @@ public Expand(Func<TIn, IEnumerator<TOut>> extrapolate)
/// </returns>
public override string ToString() => "Expand";
}

#nullable enable
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enable nullable to take advantage of the changes made in #7520


/// <summary>
/// INTERNAL API
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
[InternalApi]
public sealed class SelectAsync<TIn, TOut> : GraphStage<FlowShape<TIn, TOut>>
{
#region internal classes

private sealed class Logic : InAndOutGraphStageLogic
{
private class Holder<T>
private sealed class Holder<T>(object? message, Result<T> element)
{
private readonly Action<Holder<T>> _callback;

public Holder(object message, Result<T> element, Action<Holder<T>> callback)
{
_callback = callback;
Message = message;
Element = element;
}

public Result<T> Element { get; private set; }
public object Message { get; }
public object? Message { get; private set; } = message;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null might be a legal value for the message for all we know


public Result<T> Element { get; private set; } = element;

public void SetElement(Result<T> result)
{
Element = result.IsSuccess && result.Value == null
? Result.Failure<T>(ReactiveStreamsCompliance.ElementMustNotBeNullException)
: result;
}

public void Invoke(Result<T> result)
{
SetElement(result);
_callback(this);
}
}

private static readonly Result<TOut> NotYetThere = Result.Failure<TOut>(new Exception());

private readonly SelectAsync<TIn, TOut> _stage;
private readonly Decider _decider;
private IBuffer<Holder<TOut>> _buffer;
private readonly Action<Holder<TOut>> _taskCallback;
private readonly Action<(Holder<TOut>, Result<TOut>)> _taskCallback;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncCallbacks in Akka.Streams can only accept a single input parameter, so we have to go with a ValueTuple here


public Logic(Attributes inheritedAttributes, SelectAsync<TIn, TOut> stage) : base(stage.Shape)
{
_stage = stage;
var attr = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null);
var attr = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>();
_decider = attr != null ? attr.Decider : Deciders.StoppingDecider;

_taskCallback = GetAsyncCallback<Holder<TOut>>(HolderCompleted);
_taskCallback = GetAsyncCallback<(Holder<TOut> holder, Result<TOut> result)>(t => HolderCompleted(t.holder, t.result));

SetHandlers(stage.In, stage.Out, this);
}
Expand All @@ -2577,19 +2564,33 @@ public override void OnPush()
try
{
var task = _stage._mapFunc(message);
var holder = new Holder<TOut>(message, NotYetThere, _taskCallback);
var holder = new Holder<TOut>(message, NotYetThere);
_buffer.Enqueue(holder);

// We dispatch the task if it's ready to optimize away
// scheduling it to an execution context
if (task.IsCompleted)
{
holder.SetElement(Result.FromTask(task));
HolderCompleted(holder);
HolderCompleted(holder, Result.FromTask(task));
}
else
task.ContinueWith(t => holder.Invoke(Result.FromTask(t)),
TaskContinuationOptions.ExecuteSynchronously);
{
async Task WaitForTask()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used the latest .NET meta for relying on a local function + detached Task + await on parent task instead of using ContinueWith - this is designed to tremendously simplify error-handling mostly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the side effect of eliminating the AggregateExceptions previously emitted from SelectAsync, which is why I've had to update some of the unit tests.

{
try
{
var result = Result.Success(await task);
_taskCallback((holder, result));
}
catch(Exception ex){
var result = Result.Failure<TOut>(ex);
_taskCallback((holder, result));
}
}

_ = WaitForTask();
}

}
catch (Exception e)
{
Expand All @@ -2606,7 +2607,7 @@ public override void OnPush()
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", e);
throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", e);
}
}
if (Todo < _stage._parallelism && !HasBeenPulled(_stage.In))
Expand Down Expand Up @@ -2663,12 +2664,12 @@ private void PushOne()
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception);
throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception);
}
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since SetElement() always get invoked in the same thread now, does that mean that this bit of code isn't needed anymore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole error handling code inside PushOne() I mean

Copy link
Member Author

@Aaronontheweb Aaronontheweb Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we decided in Slack that we probably still need error-handling in both places since there can be a delay on FailStage taking effect in the AsyncCallback - we'd still want to check for errors whenever else we're calling PushOne


Push(_stage.Out, result.Value);
Push(_stage.Out!, result.Value);
if (Todo < _stage._parallelism && !HasBeenPulled(inlet))
TryPull(inlet);
}
Expand All @@ -2677,17 +2678,18 @@ private void PushOne()
}
}

private void HolderCompleted(Holder<TOut> holder)
private void HolderCompleted(Holder<TOut> holder, Result<TOut> result)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real fix - we now pass in both the Holder<TOut> and the Result<TOut> together into the StageActor's context and perform the assignment there, where it's guaranteed to be thread-safe, rather than doing it half-in / half-out like before.

{
var element = holder.Element;
if (element.IsSuccess)
// we may not be at the front of the line right now, so save the result for later
holder.SetElement(result);
if (result.IsSuccess)
{
if (IsAvailable(_stage.Out))
PushOne();
return;
}

var exception = element.Exception;
var exception = result.Exception;
var strategy = _decider(exception);
Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy);
switch (strategy)
Expand All @@ -2703,7 +2705,7 @@ private void HolderCompleted(Holder<TOut> holder)
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", exception);
throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", exception);
}
}

Expand Down Expand Up @@ -2758,8 +2760,6 @@ protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
/// <summary>
/// INTERNAL API
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
[InternalApi]
public sealed class SelectAsyncUnordered<TIn, TOut> : GraphStage<FlowShape<TIn, TOut>>
{
Expand Down Expand Up @@ -2904,6 +2904,8 @@ public SelectAsyncUnordered(int parallelism, Func<TIn, Task<TOut>> mapFunc)
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
=> new Logic(inheritedAttributes, this);
}

#nullable disable

/// <summary>
/// INTERNAL API
Expand Down
Loading