Skip to content

Commit ea68408

Browse files
Fix 7578 (#7615)
* Add reproduction unit test * Port akka/akka#23970 * Update API Approval list * Update API Approval list * Add async callback unit tests * skip non-implemented feature * Fix TaskCompletionSource allocation problem * Code cleanup * Implement early callback from akka/akka#23185 * Make sure early callbacks also get cancelled on stop * Fix naming and copyright header * Update API Approval list * Fix missing feedback callback * rerun tests * Implement locks * cleanup lock code * extend locking to `_callbackWaitingForInterpreter` * Fix unit test * rerun tests * xml-doc and TBD cleanup * more TBD and XML-DOC cleanup * defined `ConcurrentAsyncCallback` * introduced `ConcurrentAsyncCallback` into `GraphStage` * fixed all compilation errors * cleanup hub code * added API approvals * cleaned up reverse --------- Co-authored-by: Gregorius Soedharmo <[email protected]>
1 parent 98ebc72 commit ea68408

File tree

9 files changed

+1073
-351
lines changed

9 files changed

+1073
-351
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3967,7 +3967,8 @@ namespace Akka.Streams.Implementation.Fusing
39673967
public readonly object Event;
39683968
public readonly System.Action<object> Handler;
39693969
public readonly Akka.Streams.Stage.GraphStageLogic Logic;
3970-
public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Action<object> handler) { }
3970+
public readonly System.Threading.Tasks.TaskCompletionSource<Akka.Done> Promise;
3971+
public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Threading.Tasks.TaskCompletionSource<Akka.Done> promise, System.Action<object> handler) { }
39713972
public Akka.Streams.Implementation.Fusing.GraphInterpreterShell Shell { get; }
39723973
}
39733974
public class BatchingActorInputBoundary : Akka.Streams.Implementation.Fusing.GraphInterpreter.UpstreamBoundaryStageLogic
@@ -4202,7 +4203,7 @@ namespace Akka.Streams.Implementation.Fusing
42024203
public readonly Akka.Streams.Stage.GraphStageLogic[] Logics;
42034204
public readonly Akka.Streams.IMaterializer Materializer;
42044205
public const Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection NoEvent = null;
4205-
public readonly System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Action<object>> OnAsyncInput;
4206+
public readonly System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Threading.Tasks.TaskCompletionSource<Akka.Done>, System.Action<object>> OnAsyncInput;
42064207
public const int OutClosed = 32;
42074208
public const int OutReady = 8;
42084209
public const int PullEndFlip = 10;
@@ -4213,7 +4214,7 @@ namespace Akka.Streams.Implementation.Fusing
42134214
public const int Pushing = 4;
42144215
public int RunningStagesCount;
42154216
public static readonly Akka.Streams.Attributes[] SingleNoAttribute;
4216-
public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Action<object>> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { }
4217+
public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Threading.Tasks.TaskCompletionSource<Akka.Done>, System.Action<object>> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { }
42174218
public Akka.Actor.IActorRef Context { get; }
42184219
public static Akka.Streams.Implementation.Fusing.GraphInterpreter Current { get; }
42194220
public static Akka.Streams.Implementation.Fusing.GraphInterpreter CurrentInterpreterOrNull { get; }
@@ -4228,7 +4229,7 @@ namespace Akka.Streams.Implementation.Fusing
42284229
public int Execute(int eventLimit) { }
42294230
public void Finish() { }
42304231
public void Init(Akka.Streams.IMaterializer subMaterializer) { }
4231-
public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Action<object> handler) { }
4232+
public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Threading.Tasks.TaskCompletionSource<Akka.Done> promise, System.Action<object> handler) { }
42324233
public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IInHandler handler) { }
42334234
public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IOutHandler handler) { }
42344235
public override string ToString() { }
@@ -4878,12 +4879,13 @@ namespace Akka.Streams.Stage
48784879
}
48794880
public abstract class GraphStageLogic : Akka.Streams.Stage.IStageLogging
48804881
{
4881-
public static System.Action DoNothing;
4882+
public static readonly System.Action DoNothing;
48824883
public static readonly Akka.Streams.Stage.InHandler EagerTerminateInput;
48834884
public static readonly Akka.Streams.Stage.OutHandler EagerTerminateOutput;
48844885
public static readonly Akka.Streams.Stage.InHandler IgnoreTerminateInput;
48854886
public static readonly Akka.Streams.Stage.OutHandler IgnoreTerminateOutput;
48864887
public readonly int InCount;
4888+
public static readonly System.Threading.Tasks.TaskCompletionSource<Akka.Done> NoPromise;
48874889
public readonly int OutCount;
48884890
public static readonly Akka.Streams.Stage.InHandler TotallyIgnorantInput;
48894891
protected GraphStageLogic(int inCount, int outCount) { }
@@ -4922,6 +4924,7 @@ namespace Akka.Streams.Stage
49224924
protected Akka.Streams.Stage.IOutHandler GetHandler<T>(Akka.Streams.Outlet<T> outlet) { }
49234925
[Akka.Annotations.ApiMayChangeAttribute()]
49244926
protected Akka.Streams.Stage.StageActor GetStageActor(Akka.Streams.Stage.StageActorRef.Receive receive) { }
4927+
protected Akka.Streams.Stage.IAsyncCallback<T> GetTypedAsyncCallback<T>(System.Action<T> handler) { }
49254928
protected T Grab<T>(Akka.Streams.Inlet<T> inlet) { }
49264929
protected bool HasBeenPulled<T>(Akka.Streams.Inlet<T> inlet) { }
49274930
[Akka.Annotations.InternalApiAttribute()]
@@ -5010,6 +5013,11 @@ namespace Akka.Streams.Stage
50105013
protected abstract Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes);
50115014
public virtual Akka.Streams.Stage.ILogicAndMaterializedValue<Akka.NotUsed> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
50125015
}
5016+
public interface IAsyncCallback<in T>
5017+
{
5018+
void Invoke(T input);
5019+
System.Threading.Tasks.Task<Akka.Done> InvokeWithFeedback(T input);
5020+
}
50135021
public interface IAsyncContext : Akka.Streams.Stage.IContext, Akka.Streams.Stage.IDetachedContext, Akka.Streams.Stage.ILifecycleContext
50145022
{
50155023
Akka.Streams.Stage.AsyncCallback GetAsyncCallback();

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3949,7 +3949,8 @@ namespace Akka.Streams.Implementation.Fusing
39493949
public readonly object Event;
39503950
public readonly System.Action<object> Handler;
39513951
public readonly Akka.Streams.Stage.GraphStageLogic Logic;
3952-
public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Action<object> handler) { }
3952+
public readonly System.Threading.Tasks.TaskCompletionSource<Akka.Done> Promise;
3953+
public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Threading.Tasks.TaskCompletionSource<Akka.Done> promise, System.Action<object> handler) { }
39533954
public Akka.Streams.Implementation.Fusing.GraphInterpreterShell Shell { get; }
39543955
}
39553956
public class BatchingActorInputBoundary : Akka.Streams.Implementation.Fusing.GraphInterpreter.UpstreamBoundaryStageLogic
@@ -4175,7 +4176,7 @@ namespace Akka.Streams.Implementation.Fusing
41754176
public readonly Akka.Streams.Stage.GraphStageLogic[] Logics;
41764177
public readonly Akka.Streams.IMaterializer Materializer;
41774178
public const Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection NoEvent = null;
4178-
public readonly System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Action<object>> OnAsyncInput;
4179+
public readonly System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Threading.Tasks.TaskCompletionSource<Akka.Done>, System.Action<object>> OnAsyncInput;
41794180
public const int OutClosed = 32;
41804181
public const int OutReady = 8;
41814182
public const int PullEndFlip = 10;
@@ -4186,7 +4187,7 @@ namespace Akka.Streams.Implementation.Fusing
41864187
public const int Pushing = 4;
41874188
public int RunningStagesCount;
41884189
public static readonly Akka.Streams.Attributes[] SingleNoAttribute;
4189-
public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Action<object>> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { }
4190+
public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Threading.Tasks.TaskCompletionSource<Akka.Done>, System.Action<object>> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { }
41904191
public Akka.Actor.IActorRef Context { get; }
41914192
public static Akka.Streams.Implementation.Fusing.GraphInterpreter Current { get; }
41924193
public static Akka.Streams.Implementation.Fusing.GraphInterpreter CurrentInterpreterOrNull { get; }
@@ -4201,7 +4202,7 @@ namespace Akka.Streams.Implementation.Fusing
42014202
public int Execute(int eventLimit) { }
42024203
public void Finish() { }
42034204
public void Init(Akka.Streams.IMaterializer subMaterializer) { }
4204-
public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Action<object> handler) { }
4205+
public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Threading.Tasks.TaskCompletionSource<Akka.Done> promise, System.Action<object> handler) { }
42054206
public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IInHandler handler) { }
42064207
public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IOutHandler handler) { }
42074208
public override string ToString() { }
@@ -4851,12 +4852,13 @@ namespace Akka.Streams.Stage
48514852
}
48524853
public abstract class GraphStageLogic : Akka.Streams.Stage.IStageLogging
48534854
{
4854-
public static System.Action DoNothing;
4855+
public static readonly System.Action DoNothing;
48554856
public static readonly Akka.Streams.Stage.InHandler EagerTerminateInput;
48564857
public static readonly Akka.Streams.Stage.OutHandler EagerTerminateOutput;
48574858
public static readonly Akka.Streams.Stage.InHandler IgnoreTerminateInput;
48584859
public static readonly Akka.Streams.Stage.OutHandler IgnoreTerminateOutput;
48594860
public readonly int InCount;
4861+
public static readonly System.Threading.Tasks.TaskCompletionSource<Akka.Done> NoPromise;
48604862
public readonly int OutCount;
48614863
public static readonly Akka.Streams.Stage.InHandler TotallyIgnorantInput;
48624864
protected GraphStageLogic(int inCount, int outCount) { }
@@ -4895,6 +4897,7 @@ namespace Akka.Streams.Stage
48954897
protected Akka.Streams.Stage.IOutHandler GetHandler<T>(Akka.Streams.Outlet<T> outlet) { }
48964898
[Akka.Annotations.ApiMayChangeAttribute()]
48974899
protected Akka.Streams.Stage.StageActor GetStageActor(Akka.Streams.Stage.StageActorRef.Receive receive) { }
4900+
protected Akka.Streams.Stage.IAsyncCallback<T> GetTypedAsyncCallback<T>(System.Action<T> handler) { }
48984901
protected T Grab<T>(Akka.Streams.Inlet<T> inlet) { }
48994902
protected bool HasBeenPulled<T>(Akka.Streams.Inlet<T> inlet) { }
49004903
[Akka.Annotations.InternalApiAttribute()]
@@ -4983,6 +4986,11 @@ namespace Akka.Streams.Stage
49834986
protected abstract Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes);
49844987
public virtual Akka.Streams.Stage.ILogicAndMaterializedValue<Akka.NotUsed> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
49854988
}
4989+
public interface IAsyncCallback<in T>
4990+
{
4991+
void Invoke(T input);
4992+
System.Threading.Tasks.Task<Akka.Done> InvokeWithFeedback(T input);
4993+
}
49864994
public interface IAsyncContext : Akka.Streams.Stage.IContext, Akka.Streams.Stage.IDetachedContext, Akka.Streams.Stage.ILifecycleContext
49874995
{
49884996
Akka.Streams.Stage.AsyncCallback GetAsyncCallback();

src/core/Akka.Streams.Tests/Dsl/HubSpec.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,38 @@ await Awaiting(async () =>
615615
}, Materializer);
616616
}
617617

618+
[Fact]
619+
public async Task BroadcastHub_must_handle_cancelled_Sink()
620+
{
621+
await this.AssertAllStagesStoppedAsync(async () =>
622+
{
623+
var upstream = this.CreatePublisherProbe<int>();
624+
var hubSource = Source.FromPublisher(upstream).RunWith(BroadcastHub.Sink<int>(4), Materializer);
625+
var downstream = this.CreateSubscriberProbe<int>();
626+
627+
hubSource.RunWith(Sink.Cancelled<int>(), Materializer);
628+
hubSource.RunWith(Sink.FromSubscriber(downstream), Materializer);
629+
630+
await downstream.EnsureSubscriptionAsync();
631+
632+
await downstream.RequestAsync(10);
633+
await upstream.ExpectRequestAsync();
634+
await upstream.SendNextAsync(1);
635+
await downstream.ExpectNextAsync(1);
636+
await upstream.SendNextAsync(2);
637+
await downstream.ExpectNextAsync(2);
638+
await upstream.SendNextAsync(3);
639+
await downstream.ExpectNextAsync(3);
640+
await upstream.SendNextAsync(4);
641+
await downstream.ExpectNextAsync(4);
642+
await upstream.SendNextAsync(5);
643+
await downstream.ExpectNextAsync(5);
644+
645+
await upstream.SendCompleteAsync();
646+
await downstream.ExpectCompleteAsync();
647+
}, Materializer);
648+
}
649+
618650
[Fact]
619651
public async Task PartitionHub_must_work_in_the_happy_case_with_one_stream()
620652
{

0 commit comments

Comments
 (0)