Skip to content

Commit 951493b

Browse files
committed
Rewrite actor ref sink as a graph stage
1 parent 6963557 commit 951493b

File tree

6 files changed

+155
-185
lines changed

6 files changed

+155
-185
lines changed

src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1896,6 +1896,8 @@ namespace Akka.Streams.Dsl
18961896
}
18971897
public class static Sink
18981898
{
1899+
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRef<TIn>(Akka.Actor.IActorRef actorRef, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage) { }
1900+
[System.ObsoleteAttribute("Use overload accepting both on complete and on failure message")]
18991901
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRef<TIn>(Akka.Actor.IActorRef actorRef, object onCompleteMessage) { }
19001902
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRefWithAck<TIn>(Akka.Actor.IActorRef actorRef, object onInitMessage, object ackMessage, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage = null) { }
19011903
public static Akka.Streams.Dsl.Sink<TIn, Akka.Actor.IActorRef> ActorSubscriber<TIn>(Akka.Actor.Props props) { }
@@ -2738,25 +2740,14 @@ namespace Akka.Streams.Implementation
27382740
public void Subscribe(Reactive.Streams.ISubscriber<TOut> subscriber) { }
27392741
public System.Collections.Generic.IEnumerable<Reactive.Streams.ISubscriber<TOut>> TakePendingSubscribers() { }
27402742
}
2741-
public class ActorRefSinkActor : Akka.Streams.Actors.ActorSubscriber
2742-
{
2743-
protected readonly int HighWatermark;
2744-
protected readonly object OnCompleteMessage;
2745-
protected readonly Akka.Actor.IActorRef Ref;
2746-
public ActorRefSinkActor(Akka.Actor.IActorRef @ref, int highWatermark, object onCompleteMessage) { }
2747-
protected Akka.Event.ILoggingAdapter Log { get; }
2748-
public override Akka.Streams.Actors.IRequestStrategy RequestStrategy { get; }
2749-
public static Akka.Actor.Props Props(Akka.Actor.IActorRef @ref, int highWatermark, object onCompleteMessage) { }
2750-
protected override bool Receive(object message) { }
2751-
}
27522743
[Akka.Annotations.InternalApiAttribute()]
2753-
public sealed class ActorRefSink<TIn> : Akka.Streams.Implementation.SinkModule<TIn, Akka.NotUsed>
2744+
public sealed class ActorRefSinkStage<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.SinkShape<T>>
27542745
{
2755-
public ActorRefSink(Akka.Actor.IActorRef @ref, object onCompleteMessage, Akka.Streams.Attributes attributes, Akka.Streams.SinkShape<TIn> shape) { }
2756-
public override Akka.Streams.Attributes Attributes { get; }
2757-
public override object Create(Akka.Streams.MaterializationContext context, out Akka.NotUsed materializer) { }
2758-
protected override Akka.Streams.Implementation.SinkModule<TIn, Akka.NotUsed> NewInstance(Akka.Streams.SinkShape<TIn> shape) { }
2759-
public override Akka.Streams.Implementation.IModule WithAttributes(Akka.Streams.Attributes attributes) { }
2746+
public ActorRefSinkStage(Akka.Actor.IActorRef actorRef, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage) { }
2747+
public Akka.Streams.Inlet<T> In { get; }
2748+
protected override Akka.Streams.Attributes InitialAttributes { get; }
2749+
public override Akka.Streams.SinkShape<T> Shape { get; }
2750+
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
27602751
}
27612752
[Akka.Annotations.InternalApiAttribute()]
27622753
public sealed class ActorRefSource<TOut> : Akka.Streams.Implementation.SourceModule<TOut, Akka.Actor.IActorRef>

src/core/Akka.Streams.Tests/Dsl/ActorRefSinkSpec.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
// </copyright>
66
//-----------------------------------------------------------------------
77

8+
using System;
89
using Akka.Actor;
910
using Akka.Configuration;
1011
using Akka.Streams.Dsl;
@@ -58,5 +59,16 @@ public void ActorRefSink_should_cancel_a_stream_when_actor_terminates()
5859
Sys.Stop(fw);
5960
publisher.ExpectCancellation();
6061
}
62+
63+
[Fact]
64+
public void ActorRefSink_should_sends_error_message_if_upstream_fails()
65+
{
66+
var actorProbe = CreateTestProbe();
67+
var probe = this.SourceProbe<string>().To(Sink.ActorRef<string>(actorProbe.Ref, "complete", _ => "failure"))
68+
.Run(Materializer);
69+
70+
probe.SendError(new Exception("oh dear"));
71+
actorProbe.ExpectMsg("failure");
72+
}
6173
}
6274
}

src/core/Akka.Streams/Dsl/Sink.cs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,32 @@ public static Sink<TIn, NotUsed> OnComplete<TIn>(Action success, Action<Exceptio
442442
.To(Ignore<NotUsed>())
443443
.Named("OnCompleteSink");
444444

445+
/// <summary>
446+
/// INTERNAL API
447+
///
448+
/// <para>
449+
/// Sends the elements of the stream to the given <see cref="IActorRef"/>.
450+
/// If the target actor terminates the stream will be canceled.
451+
/// When the stream is completed successfully the given <paramref name="onCompleteMessage"/>
452+
/// will be sent to the destination actor.
453+
/// When the stream is completed with failure the <paramref name="onFailureMessage"/> will be
454+
/// invoked and its result will be sent to the destination actor.
455+
/// </para>
456+
/// <para>
457+
/// It will request at most <see cref="ActorMaterializerSettings.MaxInputBufferSize"/> number of elements from
458+
/// upstream, but there is no back-pressure signal from the destination actor,
459+
/// i.e. if the actor is not consuming the messages fast enough the mailbox
460+
/// of the actor will grow. For potentially slow consumer actors it is recommended
461+
/// to use a bounded mailbox with zero <see cref="BoundedMessageQueue.PushTimeOut"/> or use a rate
462+
/// limiting stage in front of this <see cref="Sink{TIn, TMat}"/>.
463+
/// </para>
464+
/// </summary>
465+
/// <typeparam name="TIn">TBD</typeparam>
466+
/// <param name="actorRef">TBD</param>
467+
/// <param name="onCompleteMessage">TBD</param>
468+
/// <param name="onFailureMessage">TBD</param>
469+
public static Sink<TIn, NotUsed> ActorRef<TIn>(IActorRef actorRef, object onCompleteMessage, Func<Exception, object> onFailureMessage)
470+
=> FromGraph(new ActorRefSinkStage<TIn>(actorRef, onCompleteMessage, onFailureMessage));
445471

446472
///<summary>
447473
/// Sends the elements of the stream to the given <see cref="IActorRef"/>.
@@ -462,8 +488,9 @@ public static Sink<TIn, NotUsed> OnComplete<TIn>(Action success, Action<Exceptio
462488
/// <param name="actorRef">TBD</param>
463489
/// <param name="onCompleteMessage">TBD</param>
464490
/// <returns>TBD</returns>
491+
[Obsolete("Use overload accepting both on complete and on failure message")]
465492
public static Sink<TIn, NotUsed> ActorRef<TIn>(IActorRef actorRef, object onCompleteMessage)
466-
=> new Sink<TIn, NotUsed>(new ActorRefSink<TIn>(actorRef, onCompleteMessage, DefaultAttributes.ActorRefSink, Shape<TIn>("ActorRefSink")));
493+
=> FromGraph(new ActorRefSinkStage<TIn>(actorRef, onCompleteMessage, ex => new Status.Failure(ex)));
467494

468495
/// <summary>
469496
/// Sends the elements of the stream to the given <see cref="IActorRef"/> that sends back back-pressure signal.

src/core/Akka.Streams/Implementation/ActorRefSinkActor.cs

Lines changed: 0 additions & 102 deletions
This file was deleted.
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="ActorRefSinkStage.cs" company="Akka.NET Project">
3+
// Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using Akka.Actor;
10+
using Akka.Annotations;
11+
using Akka.Streams.Implementation.Stages;
12+
using Akka.Streams.Stage;
13+
14+
namespace Akka.Streams.Implementation
15+
{
16+
/// <summary>
17+
/// INTERNAL API
18+
/// </summary>
19+
[InternalApi]
20+
public sealed class ActorRefSinkStage<T> : GraphStage<SinkShape<T>>
21+
{
22+
#region Logic
23+
24+
private class Logic : GraphStageLogic, IInHandler
25+
{
26+
private readonly ActorRefSinkStage<T> _stage;
27+
private bool _completionSignalled;
28+
29+
public Logic(ActorRefSinkStage<T> stage)
30+
: base(stage.Shape)
31+
{
32+
_stage = stage;
33+
SetHandler(_stage.In, this);
34+
}
35+
36+
public override void PreStart()
37+
{
38+
base.PreStart();
39+
40+
GetStageActor(tuple =>
41+
{
42+
switch (tuple.Item2)
43+
{
44+
case Terminated terminated when ReferenceEquals(terminated.ActorRef, _stage._actorRef):
45+
CompleteStage();
46+
break;
47+
case object msg:
48+
Log.Error("Unexpected message to stage actor {0}", msg.GetType().Name);
49+
break;
50+
}
51+
}).Watch(_stage._actorRef);
52+
Pull(_stage.In);
53+
}
54+
55+
public override void PostStop()
56+
{
57+
if (!_completionSignalled)
58+
_stage._actorRef.Tell(_stage._onFailureMessage(new AbruptStageTerminationException(this)));
59+
base.PostStop();
60+
}
61+
62+
public void OnPush()
63+
{
64+
var next = Grab(_stage.In);
65+
_stage._actorRef.Tell(next, ActorRefs.NoSender);
66+
Pull(_stage.In);
67+
}
68+
69+
public void OnUpstreamFinish()
70+
{
71+
_completionSignalled = true;
72+
_stage._actorRef.Tell(_stage._onCompleteMessage, ActorRefs.NoSender);
73+
CompleteStage();
74+
}
75+
76+
public void OnUpstreamFailure(Exception ex)
77+
{
78+
_completionSignalled = true;
79+
_stage._actorRef.Tell(_stage._onFailureMessage(ex), ActorRefs.NoSender);
80+
FailStage(ex);
81+
}
82+
}
83+
84+
#endregion
85+
86+
private readonly IActorRef _actorRef;
87+
private readonly object _onCompleteMessage;
88+
private readonly Func<Exception, object> _onFailureMessage;
89+
90+
public Inlet<T> In { get; } = new Inlet<T>("ActorRefSink.in");
91+
92+
public ActorRefSinkStage(IActorRef actorRef, object onCompleteMessage, Func<Exception, object> onFailureMessage)
93+
{
94+
_actorRef = actorRef;
95+
_onCompleteMessage = onCompleteMessage;
96+
_onFailureMessage = onFailureMessage;
97+
98+
Shape = new SinkShape<T>(In);
99+
}
100+
101+
protected override Attributes InitialAttributes => DefaultAttributes.ActorRefSink;
102+
103+
public override SinkShape<T> Shape { get; }
104+
105+
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);
106+
}
107+
}

src/core/Akka.Streams/Implementation/Sinks.cs

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -434,71 +434,6 @@ public override object Create(MaterializationContext context, out IActorRef mate
434434
}
435435
}
436436

437-
/// <summary>
438-
/// INTERNAL API
439-
/// </summary>
440-
/// <typeparam name="TIn">TBD</typeparam>
441-
[InternalApi]
442-
public sealed class ActorRefSink<TIn> : SinkModule<TIn, NotUsed>
443-
{
444-
private readonly IActorRef _ref;
445-
private readonly object _onCompleteMessage;
446-
private readonly Attributes _attributes;
447-
448-
/// <summary>
449-
/// TBD
450-
/// </summary>
451-
/// <param name="ref">TBD</param>
452-
/// <param name="onCompleteMessage">TBD</param>
453-
/// <param name="attributes">TBD</param>
454-
/// <param name="shape">TBD</param>
455-
public ActorRefSink(IActorRef @ref, object onCompleteMessage, Attributes attributes, SinkShape<TIn> shape)
456-
: base(shape)
457-
{
458-
_ref = @ref;
459-
_onCompleteMessage = onCompleteMessage;
460-
_attributes = attributes;
461-
}
462-
463-
/// <summary>
464-
/// TBD
465-
/// </summary>
466-
public override Attributes Attributes => _attributes;
467-
468-
/// <summary>
469-
/// TBD
470-
/// </summary>
471-
/// <param name="attributes">TBD</param>
472-
/// <returns>TBD</returns>
473-
public override IModule WithAttributes(Attributes attributes)
474-
=> new ActorRefSink<TIn>(_ref, _onCompleteMessage, attributes, AmendShape(attributes));
475-
476-
/// <summary>
477-
/// TBD
478-
/// </summary>
479-
/// <param name="shape">TBD</param>
480-
/// <returns>TBD</returns>
481-
protected override SinkModule<TIn, NotUsed> NewInstance(SinkShape<TIn> shape)
482-
=> new ActorRefSink<TIn>(_ref, _onCompleteMessage, _attributes, shape);
483-
484-
/// <summary>
485-
/// TBD
486-
/// </summary>
487-
/// <param name="context">TBD</param>
488-
/// <param name="materializer">TBD</param>
489-
/// <returns>TBD</returns>
490-
public override object Create(MaterializationContext context, out NotUsed materializer)
491-
{
492-
var actorMaterializer = ActorMaterializerHelper.Downcast(context.Materializer);
493-
var effectiveSettings = actorMaterializer.EffectiveSettings(context.EffectiveAttributes);
494-
var subscriberRef = actorMaterializer.ActorOf(context,
495-
ActorRefSinkActor.Props(_ref, effectiveSettings.MaxInputBufferSize, _onCompleteMessage));
496-
497-
materializer = null;
498-
return new ActorSubscriberImpl<TIn>(subscriberRef);
499-
}
500-
}
501-
502437
/// <summary>
503438
/// INTERNAL API
504439
/// </summary>

0 commit comments

Comments
 (0)