Skip to content

Commit ec8a419

Browse files
authored
Stop failing remembered entity if supervisor strategy failed (#7720)
* Stop failing remembered entity if supervisor strategy failed * Update API Approval list * Fix logic
1 parent 857801c commit ec8a419

File tree

8 files changed

+524
-5
lines changed

8 files changed

+524
-5
lines changed
Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="RememberEntitiesFailureSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Collections.Generic;
10+
using System.Collections.Immutable;
11+
using System.Linq;
12+
using System.Threading;
13+
using System.Threading.Tasks;
14+
using Akka.Actor;
15+
using Akka.Actor.Internal;
16+
using Akka.Cluster.Sharding.Internal;
17+
using Akka.Cluster.Tools.Singleton;
18+
using Akka.Configuration;
19+
using Akka.Event;
20+
using Akka.TestKit;
21+
using Akka.Util;
22+
using Akka.Util.Internal;
23+
using FluentAssertions;
24+
using FluentAssertions.Extensions;
25+
using Xunit;
26+
using Xunit.Abstractions;
27+
28+
namespace Akka.Cluster.Sharding.Tests;
29+
30+
public class RememberEntitiesSupervisionStrategyDecisionSpec : AkkaSpec
31+
{
32+
private sealed record EntityEnvelope(long Id, object Payload);
33+
34+
private class ConstructorFailActor : ActorBase
35+
{
36+
private readonly ILoggingAdapter _log = Context.GetLogger();
37+
38+
public ConstructorFailActor()
39+
{
40+
throw new Exception("EXPLODING CONSTRUCTOR!");
41+
}
42+
43+
protected override bool Receive(object message)
44+
{
45+
_log.Info("Msg {0}", message);
46+
Sender.Tell($"ack {message}");
47+
return true;
48+
}
49+
}
50+
51+
private class PreStartFailActor : ActorBase
52+
{
53+
private readonly ILoggingAdapter _log = Context.GetLogger();
54+
55+
protected override void PreStart()
56+
{
57+
base.PreStart();
58+
throw new Exception("EXPLODING PRE-START!");
59+
}
60+
61+
protected override bool Receive(object message)
62+
{
63+
_log.Info("Msg {0}", message);
64+
Sender.Tell($"ack {message}");
65+
return true;
66+
}
67+
}
68+
69+
private sealed class TestMessageExtractor: IMessageExtractor
70+
{
71+
public string EntityId(object message)
72+
=> message switch
73+
{
74+
EntityEnvelope env => env.Id.ToString(),
75+
_ => null
76+
};
77+
78+
public object EntityMessage(object message)
79+
=> message switch
80+
{
81+
EntityEnvelope env => env.Payload,
82+
_ => message
83+
};
84+
85+
public string ShardId(object message)
86+
=> message switch
87+
{
88+
EntityEnvelope msg => msg.Id.ToString(),
89+
_ => null
90+
};
91+
92+
public string ShardId(string entityId, object messageHint = null)
93+
=> entityId;
94+
}
95+
96+
private class FakeShardRegion : ReceiveActor
97+
{
98+
private readonly ClusterShardingSettings _settings;
99+
private readonly Props _entityProps;
100+
private IActorRef? _shard;
101+
102+
public FakeShardRegion(ClusterShardingSettings settings, Props entityProps)
103+
{
104+
_settings = settings;
105+
_entityProps = entityProps;
106+
107+
Receive<ShardInitialized>(_ =>
108+
{
109+
// no-op
110+
});
111+
Receive<ShardRegion.StartEntity>(msg =>
112+
{
113+
_shard.Forward(msg);
114+
});
115+
}
116+
117+
protected override void PreStart()
118+
{
119+
base.PreStart();
120+
var provider = new FakeStore(_settings, "cats");
121+
122+
var props = Props.Create(() => new Shard(
123+
"cats",
124+
"shard-1",
125+
_ => _entityProps,
126+
_settings,
127+
new TestMessageExtractor(),
128+
PoisonPill.Instance,
129+
provider,
130+
null
131+
));
132+
_shard = Context.ActorOf(props);
133+
}
134+
}
135+
136+
private class ShardStoreCreated
137+
{
138+
public ShardStoreCreated(IActorRef store, string shardId)
139+
{
140+
Store = store;
141+
ShardId = shardId;
142+
}
143+
144+
public IActorRef Store { get; }
145+
public string ShardId { get; }
146+
}
147+
148+
private class CoordinatorStoreCreated
149+
{
150+
public CoordinatorStoreCreated(IActorRef store)
151+
{
152+
Store = store;
153+
}
154+
155+
public IActorRef Store { get; }
156+
}
157+
158+
private class FakeStore : IRememberEntitiesProvider
159+
{
160+
public FakeStore(ClusterShardingSettings settings, string typeName)
161+
{
162+
}
163+
164+
public Props ShardStoreProps(string shardId)
165+
{
166+
return FakeShardStoreActor.Props(shardId);
167+
}
168+
169+
public Props CoordinatorStoreProps()
170+
{
171+
return FakeCoordinatorStoreActor.Props();
172+
}
173+
}
174+
175+
private class FakeShardStoreActor : ActorBase, IWithTimers
176+
{
177+
public static Props Props(string shardId) => Actor.Props.Create(() => new FakeShardStoreActor(shardId));
178+
179+
private readonly string _shardId;
180+
private readonly ILoggingAdapter _log = Context.GetLogger();
181+
182+
public FakeShardStoreActor(string shardId)
183+
{
184+
_shardId = shardId;
185+
Context.System.EventStream.Publish(new ShardStoreCreated(Self, shardId));
186+
}
187+
188+
public ITimerScheduler Timers { get; set; }
189+
190+
protected override bool Receive(object message)
191+
{
192+
switch (message)
193+
{
194+
case RememberEntitiesShardStore.GetEntities _:
195+
Sender.Tell(new RememberEntitiesShardStore.RememberedEntities(ImmutableHashSet<string>.Empty.Add("1")));
196+
return true;
197+
case RememberEntitiesShardStore.Update m:
198+
Sender.Tell(new RememberEntitiesShardStore.UpdateDone(m.Started, m.Stopped));
199+
return true;
200+
}
201+
return false;
202+
}
203+
}
204+
205+
private class FakeCoordinatorStoreActor : ActorBase, IWithTimers
206+
{
207+
public static Props Props() => Actor.Props.Create(() => new FakeCoordinatorStoreActor());
208+
209+
private readonly ILoggingAdapter _log = Context.GetLogger();
210+
211+
public ITimerScheduler Timers { get; set; }
212+
213+
public FakeCoordinatorStoreActor()
214+
{
215+
Context.System.EventStream.Publish(new CoordinatorStoreCreated(Context.Self));
216+
}
217+
218+
protected override bool Receive(object message)
219+
{
220+
switch (message)
221+
{
222+
case RememberEntitiesCoordinatorStore.GetShards _:
223+
Sender.Tell(new RememberEntitiesCoordinatorStore.RememberedShards(ImmutableHashSet<string>.Empty.Add("1")));
224+
return true;
225+
case RememberEntitiesCoordinatorStore.AddShard m:
226+
Sender.Tell(new RememberEntitiesCoordinatorStore.UpdateDone(m.ShardId));
227+
return true;
228+
}
229+
return false;
230+
}
231+
}
232+
233+
private class TestSupervisionStrategy: ShardSupervisionStrategy
234+
{
235+
private readonly AtomicCounter _counter;
236+
237+
public TestSupervisionStrategy(AtomicCounter counter, int maxRetry, int window, Func<Exception, Directive> localOnlyDecider)
238+
: base(maxRetry, window, localOnlyDecider)
239+
{
240+
_counter = counter;
241+
}
242+
243+
public override void ProcessFailure(IActorContext context, bool restart, IActorRef child, Exception cause, ChildRestartStats stats,
244+
IReadOnlyCollection<ChildRestartStats> children)
245+
{
246+
_counter.GetAndIncrement();
247+
base.ProcessFailure(context, restart, child, cause, stats, children);
248+
}
249+
}
250+
251+
private static Config SpecConfig =>
252+
ConfigurationFactory.ParseString(
253+
"""
254+
akka {
255+
loglevel = DEBUG
256+
actor.provider = cluster
257+
remote.dot-netty.tcp.port = 0
258+
259+
cluster.sharding {
260+
distributed-data.durable.keys = []
261+
state-store-mode = ddata
262+
remember-entities = on
263+
remember-entities-store = custom
264+
remember-entities-custom-store = "Akka.Cluster.Sharding.Tests.RememberEntitiesSupervisionStrategyDecisionSpec+FakeStore, Akka.Cluster.Sharding.Tests"
265+
verbose-debug-logging = on
266+
}
267+
}
268+
""")
269+
.WithFallback(ClusterSingleton.DefaultConfig())
270+
.WithFallback(ClusterSharding.DefaultConfig());
271+
272+
public RememberEntitiesSupervisionStrategyDecisionSpec(ITestOutputHelper helper) : base(SpecConfig, helper)
273+
{
274+
}
275+
276+
protected override void AtStartup()
277+
{
278+
// Form a one node cluster
279+
var cluster = Cluster.Get(Sys);
280+
cluster.Join(cluster.SelfAddress);
281+
AwaitAssert(() =>
282+
{
283+
cluster.ReadView.Members.Count(m => m.Status == MemberStatus.Up).Should().Be(1);
284+
});
285+
}
286+
287+
public Directive TestDecider(Exception cause)
288+
{
289+
return Directive.Restart;
290+
}
291+
292+
[Fact(DisplayName = "Persistent shard must stop remembered entity with excessive failures")]
293+
public async Task Persistent_Shard_must_stop_remembered_entity_with_excessive_restart_attempt()
294+
{
295+
var strategyCounter = new AtomicCounter(0);
296+
297+
var settings = ClusterShardingSettings.Create(Sys);
298+
settings = settings
299+
.WithTuningParameters(settings.TuningParameters.WithEntityRestartBackoff(0.1.Seconds()))
300+
.WithRememberEntities(true)
301+
.WithSupervisorStrategy(new TestSupervisionStrategy(strategyCounter, 3, 1000, TestDecider));
302+
303+
var storeProbe = CreateTestProbe();
304+
Sys.EventStream.Subscribe<ShardStoreCreated>(storeProbe);
305+
Sys.EventStream.Subscribe<Error>(TestActor);
306+
307+
var entityProps = Props.Create(() => new PreStartFailActor());
308+
await EventFilter.Error(contains: "cats: Remembered entity 1 was stopped: entity failed repeatedly")
309+
.ExpectOneAsync(async () =>
310+
{
311+
_ = Sys.ActorOf(Props.Create(() => new FakeShardRegion(settings, entityProps)));
312+
storeProbe.ExpectMsg<ShardStoreCreated>();
313+
await Task.Yield();
314+
});
315+
316+
// Failed on the 4th call
317+
strategyCounter.Current.Should().Be(4);
318+
}
319+
320+
[Fact(DisplayName = "Persistent shard must stop remembered entity when stopped using Directive.Stop decision")]
321+
public async Task Persistent_Shard_must_stop_remembered_entity_with_stop_directive_on_constructor_failure()
322+
{
323+
var strategyCounter = new AtomicCounter(0);
324+
325+
var settings = ClusterShardingSettings.Create(Sys);
326+
settings = settings
327+
.WithTuningParameters(settings.TuningParameters.WithEntityRestartBackoff(0.1.Seconds()))
328+
.WithRememberEntities(true)
329+
.WithSupervisorStrategy(new TestSupervisionStrategy(strategyCounter, 3, 1000, SupervisorStrategy.DefaultDecider.Decide));
330+
331+
var storeProbe = CreateTestProbe();
332+
Sys.EventStream.Subscribe<ShardStoreCreated>(storeProbe);
333+
Sys.EventStream.Subscribe<Error>(TestActor);
334+
335+
var entityProps = Props.Create(() => new ConstructorFailActor());
336+
await EventFilter.Error(contains: "cats: Remembered entity 1 was stopped: entity stopped by Directive.Stop decision")
337+
.ExpectOneAsync(async () =>
338+
{
339+
_ = Sys.ActorOf(Props.Create(() => new FakeShardRegion(settings, entityProps)));
340+
storeProbe.ExpectMsg<ShardStoreCreated>();
341+
await Task.Yield();
342+
});
343+
344+
// Failed on the 1st call
345+
strategyCounter.Current.Should().Be(1);
346+
}
347+
348+
}

src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardEntityFailureSpec.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,15 @@ public async Task Persistent_Shard_must_recover_from_failing_entity(Props entity
139139
null
140140
));
141141

142-
Sys.EventStream.Subscribe<Error>(TestActor);
142+
var errorProbe = CreateTestProbe();
143+
Sys.EventStream.Subscribe<Error>(errorProbe);
143144

144145
var persistentShard = Sys.ActorOf(props);
145146

146147
persistentShard.Tell(new EntityEnvelope(1, "Start"));
147148

148149
// entity died here
149-
var err = ExpectMsg<Error>();
150+
var err = errorProbe.ExpectMsg<Error>();
150151
err.Cause.Should().BeOfType<ActorInitializationException>();
151152

152153
// Need to wait for the internal state to reset, else everything we sent will go to dead letter

0 commit comments

Comments
 (0)