Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
Expand All @@ -16,6 +17,7 @@
using Akka.Streams.Implementation;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
using Akka.Streams.Tests.TestHelpers;
using Akka.TestKit;
using Akka.TestKit.Internal;
using Akka.TestKit.Xunit2.Attributes;
Expand Down Expand Up @@ -66,7 +68,7 @@ await c.ExpectNext(3)
}

[Fact]
public async void A_Flow_with_SelectAsync_must_produce_task_elements_in_order()
public async Task A_Flow_with_SelectAsync_must_produce_task_elements_in_order()
{
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 50))
Expand Down Expand Up @@ -467,5 +469,36 @@ Task<int> Deferred()
}
}, Materializer);
}

[Theory(DisplayName = "SelectAsync with restart decider should restart")]
[ClassData(typeof(FailingTaskData<ImmutableList<int>>))]
public async Task SelectAsyncFailingTaskTest(Func<ImmutableList<int>, Task<NotUsed>> mapFunc)
{
var materializer = ActorMaterializer.Create(Sys);

var queue = Source
.Queue<int>(bufferSize: 5000, overflowStrategy: OverflowStrategy.DropNew)
.BatchWeighted(
max: 100,
costFunction: i => i,
seed: r => ImmutableList.Create([r]),
aggregate: (oldRows, i) => oldRows.Add(i))
.SelectAsync(
parallelism: 3,
asyncMapper: mapFunc)
.AddAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream would stop if this attribute isn't declared, SelectAsync default decider is Stop.

.ToMaterialized(Sink.Ignore<NotUsed>(), Keep.Left)
.Run(materializer);

Assert.IsType<QueueOfferResult.Enqueued>(await queue.OfferAsync(1));

await Task.Delay(500.Milliseconds());

// Materializer should stay alive
Assert.False(materializer.IsShutdown);

// Stream should still work, it should not throw a `StreamDetachedException`
Assert.IsType<QueueOfferResult.Enqueued>(await queue.OfferAsync(1));
}
}
}
73 changes: 73 additions & 0 deletions src/core/Akka.Streams.Tests/TestHelpers/FailingTaskData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// -----------------------------------------------------------------------
// <copyright file="FailingTaskData.cs" company="Akka.NET Project">
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions.Extensions;
using Xunit;

namespace Akka.Streams.Tests.TestHelpers;

internal sealed class FailException() : Exception("Test Exception");

public sealed class FailingTaskData<TIn> : TheoryData<Func<TIn, Task<NotUsed>>>, IDisposable
{
private readonly CancellationTokenSource _cancelledCts;

public FailingTaskData()
{
_cancelledCts = new CancellationTokenSource();
_cancelledCts.Cancel();

var cancelledTask = Task.FromCanceled<NotUsed>(_cancelledCts.Token);

// Immediate return
Add(_ => Task.FromException<NotUsed>(new FailException()));
Add(_ =>
{
_cancelledCts.Token.ThrowIfCancellationRequested();
return Task.FromResult(NotUsed.Instance);
});
Add(_ => cancelledTask);
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
Add(async _ => throw new FailException());
Add(async _ => await cancelledTask);
Add(async _ => cancelledTask.Result);
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously

// Delayed exception
Add(async _ =>
{
await Task.Delay(100.Milliseconds());
throw new FailException();
});
Add(async _ =>
{
await Task.Yield();
throw new FailException();
});
Add(async _ =>
{
using var cts = new CancellationTokenSource(100.Milliseconds());
await Task.Delay(3.Seconds(), cts.Token);
return NotUsed.Instance;
});
Add(async _ =>
{
_cancelledCts.Token.ThrowIfCancellationRequested();
await Task.Yield();
return NotUsed.Instance;
});

}

public void Dispose()
{
_cancelledCts.Dispose();
}
}
Loading