Skip to content

Commit 1456237

Browse files
ismaelhamedArkatufus
authored andcommitted
Cancel GroupBy when all substreams cancel
(cherry picked from commit 4584f59)
1 parent 389c000 commit 1456237

File tree

2 files changed

+62
-12
lines changed

2 files changed

+62
-12
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,50 @@ public void GroupBy_must_work_if_pull_is_exercised_from_multiple_substreams_whil
574574
}, Materializer);
575575
}
576576

577+
[Fact]
578+
public void GroupBy_must_cancel_if_downstream_has_cancelled_and_all_substreams_cancel()
579+
{
580+
this.AssertAllStagesStopped(() =>
581+
{
582+
var upstream = this.CreatePublisherProbe<int>();
583+
var downstreamMaster = this.CreateSubscriberProbe<Source<int, NotUsed>>();
584+
585+
Source.FromPublisher(upstream)
586+
.Via(new GroupBy<int, int>(10, element => element))
587+
.RunWith(Sink.FromSubscriber(downstreamMaster), Materializer);
588+
589+
var substream1 = this.CreateSubscriberProbe<int>();
590+
downstreamMaster.Request(1);
591+
upstream.SendNext(1);
592+
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream1), Materializer);
593+
594+
var substream2 = this.CreateSubscriberProbe<int>();
595+
downstreamMaster.Request(1);
596+
upstream.SendNext(2);
597+
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream2), Materializer);
598+
599+
// Cancel downstream
600+
downstreamMaster.Cancel();
601+
602+
// Both substreams still work
603+
substream1.Request(1);
604+
substream1.ExpectNext(1);
605+
substream2.Request(1);
606+
substream2.ExpectNext(2);
607+
608+
// New keys are ignored
609+
upstream.SendNext(3);
610+
upstream.SendNext(4);
611+
612+
// Cancel all substreams
613+
substream1.Cancel();
614+
substream2.Cancel();
615+
616+
// Upstream gets cancelled
617+
upstream.ExpectCancellation();
618+
}, Materializer);
619+
}
620+
577621
[Fact]
578622
public void GroupBy_must_work_with_random_demand()
579623
{

src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
using Akka.Streams.Implementation.Stages;
1616
using Akka.Streams.Stage;
1717
using Akka.Streams.Supervision;
18-
using Akka.Streams.Util;
1918
using Akka.Util;
2019
using Akka.Util.Internal;
2120

@@ -464,9 +463,7 @@ public void OnUpstreamFinish()
464463

465464
public void OnDownstreamFinish()
466465
{
467-
if (_activeSubstreams.Count == 0)
468-
CompleteStage();
469-
else
466+
if (!TryCancel())
470467
SetKeepGoing(true);
471468
}
472469

@@ -493,6 +490,18 @@ private bool TryCompleteAll()
493490
return false;
494491
}
495492

493+
private bool TryCancel()
494+
{
495+
// if there's no active substreams or there's only one but it's not been pushed yet
496+
if (_activeSubstreams.Count == 0 || (_activeSubstreams.Count == 1 && _substreamWaitingToBePushed.HasValue))
497+
{
498+
CompleteStage();
499+
return true;
500+
}
501+
502+
return false;
503+
}
504+
496505
private void Fail(Exception ex)
497506
{
498507
foreach (var value in _activeSubstreams.Values)
@@ -602,15 +611,12 @@ public void OnPull()
602611

603612
public void OnDownstreamFinish()
604613
{
605-
if(_logic.HasNextElement && _logic._nextElementKey.Equals(Key))
606-
_logic.ClearNextElement();
607-
if (FirstPush)
608-
_logic._firstPushCounter--;
614+
if(_logic.HasNextElement && _logic._nextElementKey.Equals(Key)) _logic.ClearNextElement();
615+
if (FirstPush) _logic._firstPushCounter--;
609616
CompleteSubStream();
610-
if (_logic.IsClosed(_logic._stage.In))
611-
_logic.TryCompleteAll();
612-
else if (_logic.NeedToPull)
613-
_logic.Pull(_logic._stage.In);
617+
if (_logic.IsClosed(_logic._stage.Out)) _logic.TryCancel();
618+
if (_logic.IsClosed(_logic._stage.In)) _logic.TryCompleteAll();
619+
else if (_logic.NeedToPull) _logic.Pull(_logic._stage.In);
614620
}
615621
}
616622
}

0 commit comments

Comments
 (0)