Skip to content

Conversation

Aaronontheweb
Copy link
Member

@Aaronontheweb Aaronontheweb commented Mar 13, 2025

Changes

close #7518

struct assignments are not guaranteed to be atomic in .NET because each field, individually, has to be assigned - as explained by the brilliant Eric Lippert: https://ericlippert.com/2011/05/31/atomicity-volatility-and-immutability-are-different-part-two/

This PR fixes an unsafe struct assignment that caused #7518

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Copy link
Member Author

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed my changes

public override string ToString() => "Expand";
}

#nullable enable
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enable nullable to take advantage of the changes made in #7520

private sealed class Logic : InAndOutGraphStageLogic
{
private class Holder<T>
private sealed class Holder<T>(object message, Result<T> element)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used a primary CTOR because I felt like it 🤫

private readonly Decider _decider;
private IBuffer<Holder<TOut>> _buffer;
private readonly Action<Holder<TOut>> _taskCallback;
private readonly Action<(Holder<TOut>, Result<TOut>)> _taskCallback;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncCallbacks in Akka.Streams can only accept a single input parameter, so we have to go with a ValueTuple here


public Result<T> Element { get; private set; }
public object Message { get; }
public object? Message { get; private set; } = message;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null might be a legal value for the message for all we know

task.ContinueWith(t => holder.Invoke(Result.FromTask(t)),
TaskContinuationOptions.ExecuteSynchronously);
{
async Task WaitForTask()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used the latest .NET meta for relying on a local function + detached Task + await on parent task instead of using ContinueWith - this is designed to tremendously simplify error-handling mostly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the side effect of eliminating the AggregateExceptions previously emitted from SelectAsync, which is why I've had to update some of the unit tests.

}

private void HolderCompleted(Holder<TOut> holder)
private void HolderCompleted(Holder<TOut> holder, Result<TOut> result)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real fix - we now pass in both the Holder<TOut> and the Result<TOut> together into the StageActor's context and perform the assignment there, where it's guaranteed to be thread-safe, rather than doing it half-in / half-out like before.

throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception);
}
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since SetElement() always get invoked in the same thread now, does that mean that this bit of code isn't needed anymore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole error handling code inside PushOne() I mean

Copy link
Member Author

@Aaronontheweb Aaronontheweb Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we decided in Slack that we probably still need error-handling in both places since there can be a delay on FailStage taking effect in the AsyncCallback - we'd still want to check for errors whenever else we're calling PushOne

Copy link
Contributor

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb merged commit 253973e into akkadotnet:dev Mar 13, 2025
10 of 12 checks passed
@Aaronontheweb Aaronontheweb deleted the fix-7518-SelectAsync branch March 13, 2025 18:40
Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this pull request Mar 18, 2025
…ntinueWith`

Replicates some of the behaviors and fixes we made to `SelectAsync` in akkadotnet#7521
Aaronontheweb added a commit that referenced this pull request Mar 18, 2025
…instead of `ContinueWith` (#7531)

* Have `SelectAsyncUnordered` use local `async` function instead of `ContinueWith`

Replicates some of the behaviors and fixes we made to `SelectAsync` in #7521

* fixed `FlowSelectAsyncUnorderedSpecs`
This was referenced Oct 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SelectAsync stopped working even when async code block is guarded by a try...catch block

3 participants