Skip to content

Commit a1996d4

Browse files
authored
Streams: Port missing SelectAsync unit tests (#7532)
* Streams: Port missing SelectAsync unit tests * Fix #7533 * Fix #7534 * Revert "Fix #7534" This reverts commit 7a7811a. * Remove multiple decider call specs * Remove extraneous error logging
1 parent 9de5ed0 commit a1996d4

File tree

2 files changed

+357
-20
lines changed

2 files changed

+357
-20
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs

Lines changed: 341 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,107 @@ await Awaiting(async () => await done).Should()
211211
latch.CountDown();
212212
}, Materializer);
213213
}
214+
215+
[Fact(DisplayName = "A Flow with SelectAsync that failed mid-stream MUST cause a failure ASAP (stopping strategy)")]
216+
public async Task A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync_MidStream_Stop()
217+
{
218+
var tsa = new TaskCompletionSource<string>();
219+
var tsb = new TaskCompletionSource<string>();
220+
var tsc = new TaskCompletionSource<string>();
221+
var tsd = new TaskCompletionSource<string>();
222+
var tse = new TaskCompletionSource<string>();
223+
var tsf = new TaskCompletionSource<string>();
224+
225+
var input = new []{ tsa, tsb, tsc, tsd, tse , tsf };
226+
227+
await this.AssertAllStagesStoppedAsync(async () =>
228+
{
229+
var probe = Source.From(input)
230+
.SelectAsync(5, n => n.Task)
231+
.RunWith(this.SinkProbe<string>(), Materializer);
232+
233+
probe.Request(100);
234+
235+
// placing the future completion signals here is important
236+
// the ordering is meant to expose a race between the failure at C and subsequent elements
237+
tsa.SetResult("A");
238+
tsb.SetResult("B");
239+
tsc.SetException(new TestException("Boom at C"));
240+
tsd.SetResult("D");
241+
tse.SetResult("E");
242+
tsf.SetResult("F");
243+
244+
switch (await probe.ExpectNextOrErrorAsync())
245+
{
246+
case Exception ex:
247+
ex.Should().BeOfType<AggregateException>()
248+
.Which.InnerException.Should().BeOfType<TestException>()
249+
.Which.Message.Should().Be("Boom at C"); // fine, error can over-take elements
250+
return;
251+
case "A":
252+
switch (await probe.ExpectNextOrErrorAsync())
253+
{
254+
case Exception ex:
255+
ex.Should().BeOfType<AggregateException>()
256+
.Which.InnerException.Should().BeOfType<TestException>()
257+
.Which.Message.Should().Be("Boom at C"); // fine, error can over-take elements
258+
return;
259+
case "B":
260+
switch (await probe.ExpectNextOrErrorAsync())
261+
{
262+
case Exception ex:
263+
ex.Should().BeOfType<AggregateException>()
264+
.Which.InnerException.Should().BeOfType<TestException>()
265+
.Which.Message.Should().Be("Boom at C"); // fine
266+
return;
267+
case string s:
268+
Assert.Fail($"Got [{s}] yet it caused an exception, should not have happened!");
269+
return;
270+
}
271+
return;
272+
case var unexpected:
273+
Assert.Fail($"Unexpected {unexpected}");
274+
return;
275+
}
276+
case var unexpected:
277+
Assert.Fail($"Unexpected {unexpected}");
278+
return;
279+
}
280+
}, Materializer);
281+
}
282+
283+
[Fact(DisplayName = "A Flow with SelectAsync that failed mid-stream MUST skip element (resume strategy)")]
284+
public async Task A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync_MidStream_Result()
285+
{
286+
var tsa = new TaskCompletionSource<string>();
287+
var tsb = new TaskCompletionSource<string>();
288+
var tsc = new TaskCompletionSource<string>();
289+
var tsd = new TaskCompletionSource<string>();
290+
var tse = new TaskCompletionSource<string>();
291+
var tsf = new TaskCompletionSource<string>();
292+
293+
var input = new[] { tsa, tsb, tsc, tsd, tse, tsf };
294+
295+
await this.AssertAllStagesStoppedAsync(async () =>
296+
{
297+
var task = Source.From(input)
298+
.SelectAsync(5, n => n.Task)
299+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
300+
.RunWith(Sink.Seq<string>(), Materializer);
301+
302+
// the problematic ordering:
303+
tsa.SetResult("A");
304+
tsb.SetResult("B");
305+
tsd.SetResult("D");
306+
tse.SetResult("E");
307+
tsf.SetResult("F");
308+
tsc.SetException(new TestException("Boom at C"));
309+
310+
var elements = await task;
311+
elements.Should().BeEquivalentTo(new[] { "A", "B", "D", "E", "F" },
312+
options => options.WithStrictOrdering());
313+
}, Materializer);
314+
}
214315

215316
[Fact]
216317
public async Task A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync()
@@ -286,11 +387,36 @@ await this.AssertAllStagesStoppedAsync(async () => {
286387
await c.ExpectCompleteAsync();
287388
}, Materializer);
288389
}
390+
391+
[Fact]
392+
public async Task A_Flow_with_SelectAsync_must_resume_when_task_already_failed()
393+
{
394+
await this.AssertAllStagesStoppedAsync(async () => {
395+
var c = this.CreateManualSubscriberProbe<int>();
396+
Source.From(Enumerable.Range(1, 5))
397+
.SelectAsync(4, n =>
398+
{
399+
var tcs = new TaskCompletionSource<int>();
400+
if (n == 3)
401+
tcs.TrySetException(new TestException("err3"));
402+
else
403+
tcs.TrySetResult(n);
404+
return tcs.Task;
405+
})
406+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
407+
.RunWith(Sink.FromSubscriber(c), Materializer);
408+
var sub = await c.ExpectSubscriptionAsync();
409+
sub.Request(10);
410+
foreach (var i in new[] { 1, 2, 4, 5 })
411+
await c.ExpectNextAsync(i);
412+
await c.ExpectCompleteAsync();
413+
}, Materializer);
414+
}
289415

290416
[Fact]
291417
public async Task A_Flow_with_SelectAsync_must_resume_after_multiple_failures()
292418
{
293-
await this.AssertAllStagesStoppedAsync(() => {
419+
await this.AssertAllStagesStoppedAsync(async () => {
294420
var futures = new[]
295421
{
296422
Task.Run(() => { throw new TestException("failure1"); return "";}),
@@ -312,6 +438,62 @@ await this.AssertAllStagesStoppedAsync(() => {
312438
}, Materializer);
313439
}
314440

441+
[Fact(DisplayName = "A Flow with SelectAsync must complete without requiring further demand (parallelism = 1)")]
442+
public async Task CompleteWithoutDemand()
443+
{
444+
var probe = Source.Single(1)
445+
.SelectAsync(1, v => Task.Run(async () =>
446+
{
447+
await Task.Delay(20);
448+
return v;
449+
}))
450+
.RunWith(this.SinkProbe<int>(), Materializer);
451+
452+
probe.Request(1);
453+
await probe.ExpectNextAsync(1);
454+
await probe.ExpectCompleteAsync();
455+
}
456+
457+
[Fact(DisplayName = "A Flow with SelectAsync must complete without requiring further demand with completed task (parallelism = 1)")]
458+
public async Task CompleteWithoutDemandCompletedTask()
459+
{
460+
var probe = Source.Single(1)
461+
.SelectAsync(1, Task.FromResult)
462+
.RunWith(this.SinkProbe<int>(), Materializer);
463+
464+
probe.Request(1);
465+
await probe.ExpectNextAsync(1);
466+
await probe.ExpectCompleteAsync();
467+
}
468+
469+
[Fact(DisplayName = "A Flow with SelectAsync must complete without requiring further demand (parallelism = 2)")]
470+
public async Task CompleteWithoutDemandP2()
471+
{
472+
var probe = Source.From(new[] { 1, 2 })
473+
.SelectAsync(2, v => Task.Run(async () =>
474+
{
475+
await Task.Delay(20);
476+
return v;
477+
}))
478+
.RunWith(this.SinkProbe<int>(), Materializer);
479+
480+
probe.Request(2);
481+
await probe.ExpectNextNAsync(2).ToListAsync();
482+
await probe.ExpectCompleteAsync();
483+
}
484+
485+
[Fact(DisplayName = "A Flow with SelectAsync must complete without requiring further demand with completed task (parallelism = 2)")]
486+
public async Task CompleteWithoutDemandCompletedTaskP2()
487+
{
488+
var probe = Source.From(new[] { 1, 2 })
489+
.SelectAsync(2, Task.FromResult)
490+
.RunWith(this.SinkProbe<int>(), Materializer);
491+
492+
probe.Request(2);
493+
await probe.ExpectNextNAsync(2).ToListAsync();
494+
await probe.ExpectCompleteAsync();
495+
}
496+
315497
[Fact]
316498
public async Task A_Flow_with_SelectAsync_must_finish_after_task_failure()
317499
{
@@ -333,6 +515,28 @@ await this.AssertAllStagesStoppedAsync(async() =>
333515
}, Materializer);
334516
}
335517

518+
[Fact]
519+
public async Task A_Flow_with_SelectAsync_must_resume_after_task_cancels()
520+
{
521+
var c = this.CreateManualSubscriberProbe<int>();
522+
await this.AssertAllStagesStoppedAsync(async () =>
523+
{
524+
Source.From(Enumerable.Range(1, 5))
525+
.SelectAsync(4, async n =>
526+
{
527+
await MaybeCancels(n);
528+
return n;
529+
})
530+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
531+
.RunWith(Sink.FromSubscriber(c), Materializer);
532+
var sub = await c.ExpectSubscriptionAsync();
533+
sub.Request(10);
534+
foreach (var i in new[] { 1, 2, 4, 5 })
535+
await c.ExpectNextAsync(i);
536+
await c.ExpectCompleteAsync();
537+
}, Materializer);
538+
}
539+
336540
[Fact]
337541
public async Task A_Flow_with_SelectAsync_must_resume_when_SelectAsync_throws()
338542
{
@@ -353,6 +557,60 @@ public async Task A_Flow_with_SelectAsync_must_resume_when_SelectAsync_throws()
353557
await c.ExpectCompleteAsync();
354558
}
355559

560+
[Fact]
561+
public async Task A_Flow_with_SelectAsync_must_restart_after_task_throws()
562+
{
563+
var c = this.CreateManualSubscriberProbe<int>();
564+
Source.From(Enumerable.Range(1, 5))
565+
.Select(n => n)
566+
.SelectAsync(4, n => Task.Run(async () =>
567+
{
568+
await Task.Yield();
569+
if(n == 3)
570+
throw new TestException("err3");
571+
return n;
572+
}))
573+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
574+
.RunWith(Sink.FromSubscriber(c), Materializer);
575+
var sub = await c.ExpectSubscriptionAsync();
576+
sub.Request(10);
577+
foreach (var i in new[] { 1, 2, 4, 5})
578+
await c.ExpectNextAsync(i);
579+
}
580+
581+
[Fact]
582+
public async Task A_Flow_with_SelectAsync_must_restart_when_SelectAsync_task_cancelled()
583+
{
584+
var c = this.CreateManualSubscriberProbe<int>();
585+
Source.From(Enumerable.Range(1, 5))
586+
.Select(n => n)
587+
.SelectAsync(4, async n =>
588+
{
589+
await MaybeCancels(n);
590+
return n;
591+
})
592+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
593+
.RunWith(Sink.FromSubscriber(c), Materializer);
594+
var sub = await c.ExpectSubscriptionAsync();
595+
sub.Request(10);
596+
foreach (var i in new[] { 1, 2, 4, 5})
597+
await c.ExpectNextAsync(i);
598+
}
599+
600+
private static Task<int> MaybeCancels(int n)
601+
{
602+
var tcs = new TaskCompletionSource<int>();
603+
Task.Run(async () =>
604+
{
605+
await Task.Yield();
606+
if (n == 3)
607+
tcs.TrySetCanceled();
608+
else
609+
tcs.TrySetResult(n);
610+
});
611+
return tcs.Task;
612+
}
613+
356614
[Fact]
357615
public async Task A_Flow_with_SelectAsync_must_signal_NPE_when_task_is_completed_with_null()
358616
{
@@ -382,6 +640,59 @@ public async Task A_Flow_with_SelectAsync_must_resume_when_task_is_completed_wit
382640
await c.ExpectCompleteAsync();
383641
}
384642

643+
[Fact(DisplayName = "A Flow with SelectAsync must continue emitting after a sequence of nulls")]
644+
public async Task SelectAsyncNullSequence()
645+
{
646+
var flow = Flow.Create<int>()
647+
.SelectAsync(3, v => v is 0 or >= 100
648+
? Task.FromResult(v.ToString())
649+
: Task.FromResult<string>(null));
650+
651+
var task = Source.From(Enumerable.Range(0, 103))
652+
.Via(flow)
653+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
654+
.RunWith(Sink.Seq<string>(), Materializer);
655+
656+
var result = await task;
657+
result.Should().BeEquivalentTo(new[]{"0", "100", "101", "102"}, o => o.WithStrictOrdering());
658+
}
659+
660+
[Fact(DisplayName = "A Flow with SelectAsync must complete without emitting any elements after a sequence of nulls only")]
661+
public async Task SelectAsyncAllNullSequence()
662+
{
663+
var flow = Flow.Create<int>()
664+
.SelectAsync(3, _ => Task.FromResult<string>(null));
665+
666+
var task = Source.From(Enumerable.Range(0, 10))
667+
.Via(flow)
668+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
669+
.RunWith(Sink.Seq<string>(), Materializer);
670+
671+
var result = await task;
672+
result.Should().BeEmpty();
673+
}
674+
675+
[Fact(DisplayName = "A Flow with SelectAsync must complete if future task returning null completed last")]
676+
public async Task SelectAsyncNullLast()
677+
{
678+
var ts1 = new TaskCompletionSource<string>();
679+
var ts2 = new TaskCompletionSource<string>();
680+
var ts3 = new TaskCompletionSource<string>();
681+
var taskSources = new[] { ts1, ts2, ts3 };
682+
683+
var task = Source.From(taskSources)
684+
.SelectAsync(2, t => t.Task)
685+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
686+
.RunWith(Sink.Seq<string>(), Materializer);
687+
688+
ts1.TrySetResult("1");
689+
ts3.TrySetResult("3");
690+
ts2.TrySetResult(null);
691+
692+
var result = await task;
693+
result.Should().BeEquivalentTo(new[]{"1", "3"}, o => o.WithStrictOrdering());
694+
}
695+
385696
[Fact]
386697
public async Task A_Flow_with_SelectAsync_must_handle_cancel_properly()
387698
{
@@ -472,6 +783,35 @@ Task<int> Deferred()
472783
}, Materializer);
473784
}
474785

786+
[Fact(DisplayName = "A Flow with SelectAsync must not invoke the decider twice when SelectAsync throws")]
787+
public async Task SelectAsyncDeciderFailingSelectAsync()
788+
{
789+
var failCount = new AtomicCounter(0);
790+
var result = await Source.From(new[]{true, false})
791+
.SelectAsync(1, elem =>
792+
{
793+
if (elem)
794+
throw new TestException("this has gone too far");
795+
return Task.FromResult(elem);
796+
})
797+
.AddAttributes(ActorAttributes.CreateSupervisionStrategy(cause =>
798+
{
799+
switch (cause)
800+
{
801+
case TestException:
802+
failCount.IncrementAndGet();
803+
return Directive.Resume;
804+
default:
805+
return Directive.Stop;
806+
}
807+
}))
808+
.RunWith(Sink.Seq<bool>(), Materializer);
809+
810+
result.Count.Should().Be(1);
811+
result[0].Should().BeFalse();
812+
failCount.Current.Should().Be(1);
813+
}
814+
475815
[Theory(DisplayName = "SelectAsync with restart decider should restart")]
476816
[ClassData(typeof(FailingTaskData<ImmutableList<int>>))]
477817
public async Task SelectAsyncFailingTaskTest(Func<ImmutableList<int>, Task<NotUsed>> mapFunc)

0 commit comments

Comments
 (0)