Skip to content

Commit 98be809

Browse files
committed
GroupBy pulls upstream when a substream materialization is waiting
1 parent c80c11c commit 98be809

File tree

2 files changed

+77
-19
lines changed

2 files changed

+77
-19
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ public void GroupBy_must_work_under_fuzzing_stress_test()
483483
}
484484

485485
[Fact]
486-
public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main()
486+
public void GroupBy_must_work_if_pull_is_exercised_from_both_substream_and_main()
487487
{
488488
this.AssertAllStagesStopped(() =>
489489
{
@@ -517,6 +517,63 @@ public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main(
517517
}, Materializer);
518518
}
519519

520+
[Fact]
521+
public void GroupBy_must_work_if_pull_is_exercised_from_multiple_substreams_while_downstream_is_backpressuring()
522+
{
523+
this.AssertAllStagesStopped(() =>
524+
{
525+
var upstream = this.CreatePublisherProbe<int>();
526+
var downstreamMaster = this.CreateSubscriberProbe<Source<int, NotUsed>>();
527+
528+
Source.FromPublisher(upstream)
529+
.Via(new GroupBy<int, int>(10, element => element))
530+
.RunWith(Sink.FromSubscriber(downstreamMaster), Materializer);
531+
532+
var substream1 = this.CreateSubscriberProbe<int>();
533+
downstreamMaster.Request(1);
534+
upstream.SendNext(1);
535+
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream1), Materializer);
536+
537+
var substream2 = this.CreateSubscriberProbe<int>();
538+
downstreamMaster.Request(1);
539+
upstream.SendNext(2);
540+
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream2), Materializer);
541+
542+
substream1.Request(1);
543+
substream1.ExpectNext(1);
544+
substream2.Request(1);
545+
substream2.ExpectNext(2);
546+
547+
// Both substreams pull
548+
substream1.Request(1);
549+
substream2.Request(1);
550+
551+
// Upstream sends new groups
552+
upstream.SendNext(3);
553+
upstream.SendNext(4);
554+
555+
var substream3 = this.CreateSubscriberProbe<int>();
556+
var substream4 = this.CreateSubscriberProbe<int>();
557+
downstreamMaster.Request(1);
558+
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream3), Materializer);
559+
downstreamMaster.Request(1);
560+
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream4), Materializer);
561+
562+
substream3.Request(1);
563+
substream3.ExpectNext(3);
564+
substream4.Request(1);
565+
substream4.ExpectNext(4);
566+
567+
// Cleanup, not part of the actual test
568+
substream1.Cancel();
569+
substream2.Cancel();
570+
substream3.Cancel();
571+
substream4.Cancel();
572+
downstreamMaster.Cancel();
573+
upstream.SendComplete();
574+
}, Materializer);
575+
}
576+
520577
[Fact]
521578
public void GroupBy_must_work_with_random_demand()
522579
{
@@ -681,7 +738,7 @@ private void RandomDemand(Dictionary<int, SubFlowState> map, RandomDemandPropert
681738
map[key] = new SubFlowState(state.Probe, false, null);
682739
}
683740
else if (props.BlockingNextElement == null)
684-
break;
741+
break;
685742
}
686743
}
687744

src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public FlattenMerge(int breadth)
181181
internal sealed class PrefixAndTail<T> : GraphStage<FlowShape<T, (IImmutableList<T>, Source<T, NotUsed>)>>
182182
{
183183
#region internal classes
184-
184+
185185
private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler
186186
{
187187
private const string SubscriptionTimer = "SubstreamSubscriptionTimer";
@@ -205,11 +205,11 @@ public Logic(PrefixAndTail<T> stage) : base(stage.Shape)
205205
Pull(_stage._in);
206206
_tailSource.SetHandler(new LambdaOutHandler(onPull: () => Pull(_stage._in)));
207207
});
208-
208+
209209
SetHandler(_stage._in, this);
210210
SetHandler(_stage._out, this);
211211
}
212-
212+
213213
protected internal override void OnTimer(object timerKey)
214214
{
215215
var materializer = ActorMaterializerHelper.Downcast(Interpreter.Materializer);
@@ -360,7 +360,7 @@ public PrefixAndTail(int count)
360360
/// <typeparam name="TKey">TBD</typeparam>
361361
internal sealed class GroupBy<T, TKey> : GraphStage<FlowShape<T, Source<T, NotUsed>>>
362362
{
363-
#region Loigc
363+
#region Logic
364364

365365
private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler
366366
{
@@ -370,7 +370,7 @@ private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler
370370
private readonly HashSet<SubstreamSource> _substreamsJustStarted = new HashSet<SubstreamSource>();
371371
private readonly Lazy<Decider> _decider;
372372
private TimeSpan _timeout;
373-
private SubstreamSource _substreamWaitingToBePushed;
373+
private Option<SubstreamSource> _substreamWaitingToBePushed = Option<SubstreamSource>.None;
374374
private Option<TKey> _nextElementKey = Option<TKey>.None;
375375
private Option<T> _nextElementValue = Option<T>.None;
376376
private long _nextId;
@@ -379,12 +379,12 @@ private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler
379379
public Logic(GroupBy<T, TKey> stage, Attributes inheritedAttributes) : base(stage.Shape)
380380
{
381381
_stage = stage;
382-
382+
383383
_decider = new Lazy<Decider>(() =>
384384
{
385385
var attribute = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null);
386386
return attribute != null ? attribute.Decider : Deciders.StoppingDecider;
387-
});
387+
});
388388

389389
SetHandler(_stage.In, this);
390390
SetHandler(_stage.Out, this);
@@ -431,11 +431,12 @@ public void OnPush()
431431

432432
public void OnPull()
433433
{
434-
if (_substreamWaitingToBePushed != null)
434+
if (_substreamWaitingToBePushed.HasValue)
435435
{
436-
Push(_stage.Out, Source.FromGraph(_substreamWaitingToBePushed.Source));
437-
ScheduleOnce(_substreamWaitingToBePushed.Key.Value, _timeout);
438-
_substreamWaitingToBePushed = null;
436+
var substreamSource = _substreamWaitingToBePushed.Value;
437+
Push(_stage.Out, Source.FromGraph(substreamSource.Source));
438+
ScheduleOnce(substreamSource.Key.Value, _timeout);
439+
_substreamWaitingToBePushed = Option<SubstreamSource>.None;
439440
}
440441
else
441442
{
@@ -500,7 +501,7 @@ private void Fail(Exception ex)
500501
FailStage(ex);
501502
}
502503

503-
private bool NeedToPull => !(HasBeenPulled(_stage.In) || IsClosed(_stage.In) || HasNextElement);
504+
private bool NeedToPull => !(HasBeenPulled(_stage.In) || IsClosed(_stage.In) || HasNextElement || _substreamWaitingToBePushed.HasValue);
504505

505506
public override void PreStart()
506507
{
@@ -530,7 +531,7 @@ private void RunSubstream(TKey key, T value)
530531
{
531532
Push(_stage.Out, Source.FromGraph(substreamSource.Source));
532533
ScheduleOnce(key, _timeout);
533-
_substreamWaitingToBePushed = null;
534+
_substreamWaitingToBePushed = Option<SubstreamSource>.None;
534535
}
535536
else
536537
{
@@ -628,7 +629,7 @@ public GroupBy(int maxSubstreams, Func<T, TKey> keyFor)
628629
{
629630
_maxSubstreams = maxSubstreams;
630631
_keyFor = keyFor;
631-
632+
632633
Shape = new FlowShape<T, Source<T, NotUsed>>(In, Out);
633634
}
634635

@@ -778,7 +779,7 @@ public override void OnDownstreamFinish()
778779
else
779780
// Start draining
780781
if (!_logic.HasBeenPulled(_inlet))
781-
_logic.Pull(_inlet);
782+
_logic.Pull(_inlet);
782783
}
783784

784785
public override void OnPush()
@@ -1010,7 +1011,7 @@ protected CommandScheduledBeforeMaterialization(ICommand command)
10101011
internal class RequestOneScheduledBeforeMaterialization : CommandScheduledBeforeMaterialization
10111012
{
10121013
public static readonly RequestOneScheduledBeforeMaterialization Instance = new RequestOneScheduledBeforeMaterialization(RequestOne.Instance);
1013-
1014+
10141015
private RequestOneScheduledBeforeMaterialization(ICommand command) : base(command)
10151016
{
10161017
}
@@ -1046,7 +1047,7 @@ private RequestOne()
10461047
{
10471048
}
10481049
}
1049-
1050+
10501051
internal class Cancel : ICommand
10511052
{
10521053
public static readonly Cancel Instance = new Cancel();

0 commit comments

Comments
 (0)