Skip to content

Commit 7149cb3

Browse files
committed
Backport of akkadotnet#5965 - pass Akka.Cluster.Cluster into IDowningProvider directly
1 parent a4a6fdd commit 7149cb3

13 files changed

+163
-63
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Core.verified.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ namespace Akka.Cluster
1919
{
2020
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
2121
{
22-
public AutoDowning(Akka.Actor.ActorSystem system) { }
22+
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
2323
public System.TimeSpan DownRemovalMargin { get; }
2424
public Akka.Actor.Props DowningActorProps { get; }
2525
}
@@ -266,13 +266,13 @@ namespace Akka.Cluster
266266
}
267267
public sealed class NoDowning : Akka.Cluster.IDowningProvider
268268
{
269-
public NoDowning(Akka.Actor.ActorSystem system) { }
269+
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
270270
public System.TimeSpan DownRemovalMargin { get; }
271271
public Akka.Actor.Props DowningActorProps { get; }
272272
}
273273
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
274274
{
275-
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
275+
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
276276
public System.TimeSpan DownRemovalMargin { get; }
277277
public Akka.Actor.Props DowningActorProps { get; }
278278
public System.TimeSpan StableAfter { get; }
@@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
417417
}
418418
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
419419
{
420-
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
420+
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
421421
public System.TimeSpan DownRemovalMargin { get; }
422422
public Akka.Actor.Props DowningActorProps { get; }
423423
}

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ namespace Akka.Cluster
1919
{
2020
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
2121
{
22-
public AutoDowning(Akka.Actor.ActorSystem system) { }
22+
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
2323
public System.TimeSpan DownRemovalMargin { get; }
2424
public Akka.Actor.Props DowningActorProps { get; }
2525
}
@@ -266,13 +266,13 @@ namespace Akka.Cluster
266266
}
267267
public sealed class NoDowning : Akka.Cluster.IDowningProvider
268268
{
269-
public NoDowning(Akka.Actor.ActorSystem system) { }
269+
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
270270
public System.TimeSpan DownRemovalMargin { get; }
271271
public Akka.Actor.Props DowningActorProps { get; }
272272
}
273273
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
274274
{
275-
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
275+
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
276276
public System.TimeSpan DownRemovalMargin { get; }
277277
public Akka.Actor.Props DowningActorProps { get; }
278278
public System.TimeSpan StableAfter { get; }
@@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
417417
}
418418
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
419419
{
420-
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
420+
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
421421
public System.TimeSpan DownRemovalMargin { get; }
422422
public Akka.Actor.Props DowningActorProps { get; }
423423
}

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ namespace Akka.Cluster
1919
{
2020
public sealed class AutoDowning : Akka.Cluster.IDowningProvider
2121
{
22-
public AutoDowning(Akka.Actor.ActorSystem system) { }
22+
public AutoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
2323
public System.TimeSpan DownRemovalMargin { get; }
2424
public Akka.Actor.Props DowningActorProps { get; }
2525
}
@@ -266,13 +266,13 @@ namespace Akka.Cluster
266266
}
267267
public sealed class NoDowning : Akka.Cluster.IDowningProvider
268268
{
269-
public NoDowning(Akka.Actor.ActorSystem system) { }
269+
public NoDowning(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
270270
public System.TimeSpan DownRemovalMargin { get; }
271271
public Akka.Actor.Props DowningActorProps { get; }
272272
}
273273
public sealed class SplitBrainResolver : Akka.Cluster.IDowningProvider
274274
{
275-
public SplitBrainResolver(Akka.Actor.ActorSystem system) { }
275+
public SplitBrainResolver(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
276276
public System.TimeSpan DownRemovalMargin { get; }
277277
public Akka.Actor.Props DowningActorProps { get; }
278278
public System.TimeSpan StableAfter { get; }
@@ -417,7 +417,7 @@ namespace Akka.Cluster.SBR
417417
}
418418
public class SplitBrainResolverProvider : Akka.Cluster.IDowningProvider
419419
{
420-
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system) { }
420+
public SplitBrainResolverProvider(Akka.Actor.ActorSystem system, Akka.Cluster.Cluster cluster) { }
421421
public System.TimeSpan DownRemovalMargin { get; }
422422
public Akka.Actor.Props DowningActorProps { get; }
423423
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// //-----------------------------------------------------------------------
2+
// // <copyright file="Bugfix5962Spec.cs" company="Akka.NET Project">
3+
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// // </copyright>
6+
// //-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Threading.Tasks;
10+
using Akka.Configuration;
11+
using Akka.TestKit;
12+
using FluentAssertions;
13+
using FluentAssertions.Extensions;
14+
using Xunit;
15+
using Xunit.Abstractions;
16+
using static FluentAssertions.FluentActions;
17+
18+
19+
namespace Akka.Cluster.Tests
20+
{
21+
public class Bugfix5962Spec : TestKit.Xunit2.TestKit
22+
{
23+
private static readonly Config Config = ConfigurationFactory.ParseString(@"
24+
akka {
25+
loglevel = INFO
26+
actor {
27+
provider = cluster
28+
default-dispatcher = {
29+
executor = channel-executor
30+
channel-executor.priority = normal
31+
}
32+
# Adding this part in combination with the SplitBrainResolverProvider causes the error
33+
internal-dispatcher = {
34+
executor = channel-executor
35+
channel-executor.priority = high
36+
}
37+
}
38+
remote {
39+
dot-netty.tcp {
40+
port = 15508
41+
hostname = ""127.0.0.1""
42+
}
43+
default-remote-dispatcher {
44+
executor = channel-executor
45+
channel-executor.priority = high
46+
}
47+
backoff-remote-dispatcher {
48+
executor = channel-executor
49+
channel-executor.priority = low
50+
}
51+
}
52+
cluster {
53+
seed-nodes = [""akka.tcp://[email protected]:15508""]
54+
downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
55+
}
56+
}");
57+
58+
private readonly Type _timerMsgType;
59+
60+
public Bugfix5962Spec(ITestOutputHelper output): base(Config, nameof(Bugfix5962Spec), output)
61+
{
62+
_timerMsgType = Type.GetType("Akka.Actor.Scheduler.TimerScheduler+TimerMsg, Akka");
63+
}
64+
65+
[Fact]
66+
public async Task SBR_Should_work_with_channel_executor()
67+
{
68+
var latch = new TestLatch(1);
69+
var cluster = Cluster.Get(Sys);
70+
cluster.RegisterOnMemberUp(() =>
71+
{
72+
latch.CountDown();
73+
});
74+
75+
var selection = Sys.ActorSelection("akka://Bugfix5962Spec/system/cluster/core/daemon/downingProvider");
76+
77+
await Awaiting(() => selection.ResolveOne(1.Seconds()))
78+
.Should().NotThrowAsync("Downing provider should be alive. ActorSelection will throw an ActorNotFoundException if this fails");
79+
80+
// There should be no TimerMsg being sent to dead letter, this signals that the downing provider is dead
81+
await EventFilter.DeadLetter(_timerMsgType).ExpectAsync(0, async () =>
82+
{
83+
latch.Ready(1.Seconds());
84+
await Task.Delay(2.Seconds());
85+
});
86+
}
87+
}
88+
}

src/core/Akka.Cluster.Tests/DowningProviderSpec.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616

1717
namespace Akka.Cluster.Tests
1818
{
19-
class FailingDowningProvider : IDowningProvider
19+
internal class FailingDowningProvider : IDowningProvider
2020
{
21-
public FailingDowningProvider(ActorSystem system)
21+
public FailingDowningProvider(ActorSystem system, Cluster cluster)
2222
{
2323
}
2424

@@ -36,7 +36,7 @@ public Props DowningActorProps
3636
class DummyDowningProvider : IDowningProvider
3737
{
3838
public readonly AtomicBoolean ActorPropsAccessed = new AtomicBoolean(false);
39-
public DummyDowningProvider(ActorSystem system)
39+
public DummyDowningProvider(ActorSystem system, Cluster cluster)
4040
{
4141
}
4242

src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77

88
using System;
99
using System.Threading;
10+
using System.Threading.Tasks;
1011
using Akka.Actor;
1112
using Akka.Actor.Dsl;
1213
using Akka.Configuration;
1314
using Akka.Event;
1415
using Akka.TestKit;
1516
using Akka.Util;
1617
using Xunit;
18+
using Xunit.Abstractions;
1719

1820
namespace Akka.Cluster.Tests
1921
{
@@ -25,12 +27,13 @@ public class StartupWithOneThreadSpec : AkkaSpec
2527
akka.actor.default-dispatcher.dedicated-thread-pool.thread-count = 1
2628
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
2729
akka.remote.dot-netty.tcp.port = 0
30+
akka.cluster.downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
31+
akka.cluster.split-brain-resolver.active-strategy = keep-majority
2832
");
2933

3034
private long _startTime;
3135

32-
public StartupWithOneThreadSpec() : base(Configuration)
33-
{
36+
public StartupWithOneThreadSpec(ITestOutputHelper output) : base(Configuration, output) {
3437
_startTime = MonotonicClock.GetTicks();
3538
}
3639

@@ -53,7 +56,7 @@ private Props TestProps
5356
}
5457

5558
[Fact]
56-
public void A_cluster_must_startup_with_one_dispatcher_thread()
59+
public async Task A_cluster_must_startup_with_one_dispatcher_thread()
5760
{
5861
// This test failed before fixing https://github.com/akkadotnet/akka.net/issues/1959 when adding a sleep before the
5962
// Await of GetClusterCoreRef in the Cluster extension constructor.
@@ -75,6 +78,11 @@ public void A_cluster_must_startup_with_one_dispatcher_thread()
7578
ExpectMsg("hello");
7679
ExpectMsg("hello");
7780
ExpectMsg("hello");
81+
82+
// perform a self-join
83+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds((3)));
84+
var selfAddress = cluster.SelfAddress;
85+
await cluster.JoinSeedNodesAsync(new[] { selfAddress }, cts.Token);
7886
}
7987
}
8088
}

src/core/Akka.Cluster/AutoDown.cs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ internal sealed class AutoDown : AutoDownBase
3030
/// TBD
3131
/// </summary>
3232
/// <param name="autoDownUnreachableAfter">TBD</param>
33+
/// <param name="cluster"></param>
3334
/// <returns>TBD</returns>
34-
public static Props Props(TimeSpan autoDownUnreachableAfter)
35+
public static Props Props(TimeSpan autoDownUnreachableAfter, Cluster cluster)
3536
{
36-
return Actor.Props.Create<AutoDown>(autoDownUnreachableAfter);
37+
return Actor.Props.Create(() => new AutoDown(autoDownUnreachableAfter, cluster));
3738
}
3839

3940
/// <summary>
@@ -76,14 +77,10 @@ public override int GetHashCode()
7677
}
7778

7879
private readonly Cluster _cluster;
79-
80-
/// <summary>
81-
/// TBD
82-
/// </summary>
83-
/// <param name="autoDownUnreachableAfter">TBD</param>
84-
public AutoDown(TimeSpan autoDownUnreachableAfter) : base(autoDownUnreachableAfter)
80+
81+
public AutoDown(TimeSpan autoDownUnreachableAfter, Cluster cluster) : base(autoDownUnreachableAfter)
8582
{
86-
_cluster = Cluster.Get(Context.System);
83+
_cluster = cluster;
8784
}
8885

8986
/// <summary>
@@ -276,20 +273,18 @@ private void Remove(UniqueAddress node)
276273
public sealed class AutoDowning : IDowningProvider
277274
{
278275
private readonly ActorSystem _system;
279-
280-
/// <summary>
281-
/// TBD
282-
/// </summary>
283-
/// <param name="system">TBD</param>
284-
public AutoDowning(ActorSystem system)
276+
private readonly Cluster _cluster;
277+
278+
public AutoDowning(ActorSystem system, Cluster cluster)
285279
{
286280
_system = system;
281+
_cluster = cluster;
287282
}
288283

289284
/// <summary>
290285
/// TBD
291286
/// </summary>
292-
public TimeSpan DownRemovalMargin => Cluster.Get(_system).Settings.DownRemovalMargin;
287+
public TimeSpan DownRemovalMargin => _cluster.Settings.DownRemovalMargin;
293288

294289
/// <summary>
295290
/// TBD
@@ -301,11 +296,11 @@ public Props DowningActorProps
301296
{
302297
get
303298
{
304-
var autoDownUnreachableAfter = Cluster.Get(_system).Settings.AutoDownUnreachableAfter;
299+
var autoDownUnreachableAfter = _cluster.Settings.AutoDownUnreachableAfter;
305300
if (!autoDownUnreachableAfter.HasValue)
306301
throw new ConfigurationException("AutoDowning downing provider selected but 'akka.cluster.auto-down-unreachable-after' not set");
307302

308-
return AutoDown.Props(autoDownUnreachableAfter.Value);
303+
return AutoDown.Props(autoDownUnreachableAfter.Value, _cluster);
309304
}
310305
}
311306
}

src/core/Akka.Cluster/Cluster.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public Cluster(ActorSystemImpl system)
132132
Scheduler = CreateScheduler(system);
133133

134134
// it has to be lazy - otherwise if downing provider will init a cluster itself, it will deadlock
135-
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system), LazyThreadSafetyMode.ExecutionAndPublication);
135+
_downingProvider = new Lazy<IDowningProvider>(() => Akka.Cluster.DowningProvider.Load(Settings.DowningProviderType, system, this), LazyThreadSafetyMode.ExecutionAndPublication);
136136

137137
//create supervisor for daemons under path "/system/cluster"
138138
_clusterDaemons = system.SystemActorOf(Props.Create(() => new ClusterDaemon(Settings)).WithDeploy(Deploy.Local), "cluster");

src/core/Akka.Cluster/Configuration/Cluster.conf

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,10 @@ akka {
5959
# * if it is 'off' the `NoDowning` provider is used and no automatic downing will be performed
6060
# * if it is set to a duration the `AutoDowning` provider is with the configured downing duration
6161
#
62-
# If specified the value must be the fully qualified class name of a subclass of
63-
# `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem`
62+
# If specified the value must be the fully qualified class name of an implementation of
63+
# `Akka.Cluster.IDowningProvider` having two argument constructor:
64+
# - argument 1: accepting an `ActorSystem`
65+
# - argument 2: accepting an `Akka.Cluster.Cluster`
6466
downing-provider-class = ""
6567

6668
# If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network

0 commit comments

Comments
 (0)