Skip to content

Commit b6214ef

Browse files
Fixed IActorRef leak inside EventStream (#5720)
* reproduced #5717 Reproduced `IActorRef` leak inside the `EventStream` * cleaned up the `EventBusUnsubscriber` * close #5719 - cleaned up `EventStream` subscription management * added API approval For `Obsolete` attribute. * need to capture more data on why failures happen * harden bugfix5717specs
1 parent d055f46 commit b6214ef

File tree

4 files changed

+158
-163
lines changed

4 files changed

+158
-163
lines changed

src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3044,6 +3044,7 @@ namespace Akka.Event
30443044
public class EventStream : Akka.Event.LoggingBus
30453045
{
30463046
public EventStream(bool debug) { }
3047+
[System.ObsoleteAttribute("Should be removed in 1.5")]
30473048
public bool InitUnsubscriber(Akka.Actor.IActorRef unsubscriber, Akka.Actor.ActorSystem system) { }
30483049
public void StartUnsubscriber(Akka.Actor.Internal.ActorSystemImpl system) { }
30493050
public override bool Subscribe(Akka.Actor.IActorRef subscriber, System.Type channel) { }
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// //-----------------------------------------------------------------------
2+
// // <copyright file="Bugfix5717Specs.cs" company="Akka.NET Project">
3+
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// // </copyright>
6+
// //-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Threading.Tasks;
10+
using Akka.Configuration;
11+
using Akka.TestKit;
12+
using Xunit;
13+
using Xunit.Abstractions;
14+
15+
namespace Akka.Tests.Event
16+
{
17+
public class Bugfix5717Specs : AkkaSpec
18+
{
19+
public Bugfix5717Specs(ITestOutputHelper output) : base(Config.Empty, output){}
20+
21+
/// <summary>
22+
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/5717
23+
/// </summary>
24+
[Fact]
25+
public async Task Should_unsubscribe_from_all_topics_on_Terminate()
26+
{
27+
var es = Sys.EventStream;
28+
var tm1 = 1;
29+
var tm2 = "FOO";
30+
var a1 = CreateTestProbe();
31+
var a2 = CreateTestProbe();
32+
33+
es.Subscribe(a1.Ref, typeof(int));
34+
es.Subscribe(a2.Ref, typeof(int));
35+
es.Subscribe(a2.Ref, typeof(string));
36+
es.Publish(tm1);
37+
es.Publish(tm2);
38+
a1.ExpectMsg(tm1);
39+
a2.ExpectMsg(tm1);
40+
a2.ExpectMsg(tm2);
41+
42+
// kill second test probe
43+
Watch(a2);
44+
Sys.Stop(a2);
45+
ExpectTerminated(a2);
46+
47+
/*
48+
* It's possible that the `Terminate` message may not have been processed by the
49+
* Unsubscriber yet, so we want to try this operation more than once to see if it
50+
* eventually executes the unsubscribe on the EventStream.
51+
*
52+
* If it still fails after multiple attempts, the issue is that the unsub was never
53+
* executed in the first place.
54+
*/
55+
await AwaitAssertAsync(async () =>
56+
{
57+
await EventFilter.DeadLetter().ExpectAsync(0, () =>
58+
{
59+
es.Publish(tm1);
60+
es.Publish(tm2);
61+
a1.ExpectMsg(tm1);
62+
});
63+
}, interval:TimeSpan.FromSeconds(250));
64+
}
65+
}
66+
}

src/core/Akka/Event/EventBusUnsubscriber.cs

Lines changed: 41 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@
66
//-----------------------------------------------------------------------
77

88
using Akka.Actor;
9-
using Akka.Actor.Internal;
109
using Akka.Annotations;
11-
using Akka.Dispatch;
12-
using Akka.Util.Internal;
1310

1411
namespace Akka.Event
1512
{
@@ -26,7 +23,7 @@ namespace Akka.Event
2623
/// watching a few actors too much - we opt for the 2nd choice here.
2724
/// </summary>
2825
[InternalApi]
29-
class EventStreamUnsubscriber : ActorBase
26+
internal class EventStreamUnsubscriber : ActorBase
3027
{
3128
private readonly EventStream _eventStream;
3229
private readonly bool _debug;
@@ -45,140 +42,83 @@ public EventStreamUnsubscriber(EventStream eventStream, ActorSystem system, bool
4542
_debug = debug;
4643

4744
}
48-
49-
/// <summary>
50-
/// TBD
51-
/// </summary>
52-
/// <param name="message">TBD</param>
53-
/// <returns>TBD</returns>
45+
5446
protected override bool Receive(object message)
5547
{
56-
return message.Match().With<Register>(register =>
48+
switch (message)
5749
{
58-
if (_debug)
59-
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
60-
string.Format("watching {0} in order to unsubscribe from EventStream when it terminates", register.Actor)));
61-
Context.Watch(register.Actor);
62-
}).With<UnregisterIfNoMoreSubscribedChannels>(unregister =>
63-
{
64-
if (_debug)
65-
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
66-
string.Format("unwatching {0} since has no subscriptions", unregister.Actor)));
67-
Context.Unwatch(unregister.Actor);
68-
}).With<Terminated>(terminated =>
69-
{
70-
if (_debug)
71-
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
72-
string.Format("unsubscribe {0} from {1}, because it was terminated", terminated.Actor , _eventStream )));
73-
_eventStream.Unsubscribe(terminated.Actor);
74-
})
75-
.WasHandled;
50+
case Register register:
51+
{
52+
if (_debug)
53+
_eventStream.Publish(new Debug(GetType().Name, GetType(),
54+
$"watching {register.Actor} in order to unsubscribe from EventStream when it terminates"));
55+
Context.Watch(register.Actor);
56+
break;
57+
}
58+
case UnregisterIfNoMoreSubscribedChannels unregister:
59+
{
60+
if (_debug)
61+
_eventStream.Publish(new Debug(GetType().Name, GetType(),
62+
$"unwatching {unregister.Actor} since has no subscriptions"));
63+
Context.Unwatch(unregister.Actor);
64+
break;
65+
}
66+
case Terminated terminated:
67+
{
68+
if (_debug)
69+
_eventStream.Publish(new Debug(GetType().Name, GetType(),
70+
$"unsubscribe {terminated.ActorRef} from {_eventStream}, because it was terminated"));
71+
_eventStream.Unsubscribe(terminated.ActorRef);
72+
break;
73+
}
74+
default:
75+
return false;
76+
}
77+
78+
return true;
7679
}
7780

78-
/// <summary>
79-
/// TBD
80-
/// </summary>
8181
protected override void PreStart()
8282
{
8383
if (_debug)
84-
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
84+
_eventStream.Publish(new Debug(GetType().Name, GetType(),
8585
string.Format("registering unsubscriber with {0}", _eventStream)));
86-
_eventStream.InitUnsubscriber(Self, _system);
8786
}
8887

8988
/// <summary>
90-
/// TBD
89+
/// INTERNAL API
90+
///
91+
/// Registers a new subscriber to be death-watched and automatically unsubscribed.
9192
/// </summary>
9293
internal class Register
9394
{
94-
/// <summary>
95-
/// TBD
96-
/// </summary>
97-
/// <param name="actor">TBD</param>
9895
public Register(IActorRef actor)
9996
{
10097
Actor = actor;
10198
}
10299

103100
/// <summary>
104-
/// TBD
105-
/// </summary>
106-
public IActorRef Actor { get; private set; }
107-
}
108-
109-
110-
/// <summary>
111-
/// TBD
112-
/// </summary>
113-
internal class Terminated
114-
{
115-
/// <summary>
116-
/// TBD
117-
/// </summary>
118-
/// <param name="actor">TBD</param>
119-
public Terminated(IActorRef actor)
120-
{
121-
Actor = actor;
122-
}
123-
124-
/// <summary>
125-
/// TBD
101+
/// The actor we're going to deathwatch and automatically unsubscribe
126102
/// </summary>
127103
public IActorRef Actor { get; private set; }
128104
}
129105

130106
/// <summary>
131-
/// TBD
107+
/// INTERNAL API
108+
///
109+
/// Unsubscribes an actor that is no longer subscribed and does not need to be death-watched any longer.
132110
/// </summary>
133111
internal class UnregisterIfNoMoreSubscribedChannels
134112
{
135-
/// <summary>
136-
/// TBD
137-
/// </summary>
138-
/// <param name="actor">TBD</param>
139113
public UnregisterIfNoMoreSubscribedChannels(IActorRef actor)
140114
{
141115
Actor = actor;
142116
}
143117

144118
/// <summary>
145-
/// TBD
119+
/// The actor we're no longer going to death watch.
146120
/// </summary>
147121
public IActorRef Actor { get; private set; }
148122
}
149123
}
150-
151-
152-
153-
/// <summary>
154-
/// Provides factory for Akka.Event.EventStreamUnsubscriber actors with unique names.
155-
/// This is needed if someone spins up more EventStreams using the same ActorSystem,
156-
/// each stream gets it's own unsubscriber.
157-
/// </summary>
158-
class EventStreamUnsubscribersProvider
159-
{
160-
private readonly AtomicCounter _unsubscribersCounter = new AtomicCounter(0);
161-
private static readonly EventStreamUnsubscribersProvider _instance = new EventStreamUnsubscribersProvider();
162-
163-
164-
/// <summary>
165-
/// TBD
166-
/// </summary>
167-
public static EventStreamUnsubscribersProvider Instance
168-
{
169-
get { return _instance; }
170-
}
171-
172-
/// <summary>
173-
/// TBD
174-
/// </summary>
175-
/// <param name="system">TBD</param>
176-
/// <param name="eventStream">TBD</param>
177-
/// <param name="debug">TBD</param>
178-
public void Start(ActorSystemImpl system, EventStream eventStream, bool debug)
179-
{
180-
system.SystemActorOf(Props.Create<EventStreamUnsubscriber>(eventStream, system, debug).WithDispatcher(Dispatchers.InternalDispatcherId),
181-
string.Format("EventStreamUnsubscriber-{0}", _unsubscribersCounter.IncrementAndGet()));
182-
}
183-
}
184124
}

0 commit comments

Comments
 (0)