Skip to content

Commit 234188e

Browse files
HeartbeatNodeRing performance (#4943)
* added benchmark for HeartbeatNodeRing performance * switched to local function No perf change * approve Akka.Benchmarks friend assembly for Akka.Cluster * remove HeartbeatNodeRing.NodeRing() allocation and make field immutable * made it so Akka.Util.Internal.ArrayExtensions.From no longer allocates (much) * added some descriptive comments on HeartbeatNodeRing.Receivers * Replaced `Lazy<T>` with `Option<T>` and a similar lazy initialization check Improved throughput by ~10% on larger collections and further reduced memory allocation. * changed return types to `IImmutableSet` Did this in order to reduce allocations from constantly converting back and forth from `ImmutableSortedSet<T>` and `ImmutableHashSet<T>` - that way we can just use whatever the underlying collection type is. * added ReachabilityBenchmarks
1 parent ac07a0f commit 234188e

File tree

8 files changed

+258
-31
lines changed

8 files changed

+258
-31
lines changed

src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
<PackageReference Include="BenchmarkDotNet" Version="0.12.1" />
1111
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonVersion)" />
1212
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
13+
<!-- FluentAssertions is used in some benchmarks to validate internal behaviors -->
14+
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
1315
</ItemGroup>
1416

1517
<ItemGroup>
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Collections.Immutable;
4+
using System.Linq;
5+
using System.Text;
6+
using Akka.Actor;
7+
using Akka.Benchmarks.Configurations;
8+
using Akka.Cluster;
9+
using BenchmarkDotNet.Attributes;
10+
using FluentAssertions;
11+
12+
namespace Akka.Benchmarks.Cluster
13+
{
14+
[Config(typeof(MicroBenchmarkConfig))]
15+
public class HeartbeatNodeRingBenchmarks
16+
{
17+
[Params(10, 100, 250)]
18+
public int NodesSize;
19+
20+
21+
internal static HeartbeatNodeRing CreateHearbeatNodeRingOfSize(int size)
22+
{
23+
var nodes = Enumerable.Range(1, size)
24+
.Select(x => new UniqueAddress(new Address("akka", "sys", "node-" + x, 2552), x))
25+
.ToList();
26+
var selfAddress = nodes[size / 2];
27+
return new HeartbeatNodeRing(selfAddress, nodes.ToImmutableHashSet(), ImmutableHashSet<UniqueAddress>.Empty, 5);
28+
}
29+
30+
private HeartbeatNodeRing _ring;
31+
32+
[GlobalSetup]
33+
public void Setup()
34+
{
35+
_ring = CreateHearbeatNodeRingOfSize(NodesSize);
36+
}
37+
38+
private static void MyReceivers(HeartbeatNodeRing ring)
39+
{
40+
var r = new HeartbeatNodeRing(ring.SelfAddress, ring.Nodes, ImmutableHashSet<UniqueAddress>.Empty, ring.MonitoredByNumberOfNodes);
41+
r.MyReceivers.Value.Count.Should().BeGreaterThan(0);
42+
}
43+
44+
[Benchmark]
45+
[Arguments(1000)]
46+
public void HeartbeatNodeRing_should_produce_MyReceivers(int iterations)
47+
{
48+
for(var i = 0; i < iterations; i++)
49+
MyReceivers(_ring);
50+
}
51+
}
52+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using Akka.Util;
5+
using Akka.Actor;
6+
using Akka.Benchmarks.Configurations;
7+
using Akka.Cluster;
8+
using BenchmarkDotNet.Attributes;
9+
using FluentAssertions;
10+
11+
namespace Akka.Benchmarks.Cluster
12+
{
13+
[Config(typeof(MicroBenchmarkConfig))]
14+
public class ReachabilityBenchmarks
15+
{
16+
[Params(10, 100, 250)]
17+
public int NodesSize;
18+
19+
[Params(100)]
20+
public int Iterations;
21+
22+
public Address Address = new Address("akka", "sys", "a", 2552);
23+
public Address Node = new Address("akka", "sys", "a", 2552);
24+
25+
private Reachability CreateReachabilityOfSize(Reachability baseReachability, int size)
26+
{
27+
return Enumerable.Range(1, size).Aggregate(baseReachability, (r, i) =>
28+
{
29+
var obs = new UniqueAddress(Address.WithHost("node-" + i), i);
30+
var j = i == size ? 1 : i + 1;
31+
var subject = new UniqueAddress(Address.WithHost("node-" + j), j);
32+
return r.Unreachable(obs, subject).Reachable(obs, subject);
33+
});
34+
}
35+
36+
private Reachability AddUnreachable(Reachability baseReachability, int count)
37+
{
38+
var observers = baseReachability.Versions.Keys.Take(count);
39+
// the Keys HashSet<T> IEnumerator does not support Reset, hence why we have to convert it to a list
40+
using var subjects = baseReachability.Versions.Keys.ToList().GetContinuousEnumerator();
41+
return observers.Aggregate(baseReachability, (r, o) =>
42+
{
43+
return Enumerable.Range(1, 5).Aggregate(r, (r2, i) =>
44+
{
45+
subjects.MoveNext();
46+
return r2.Unreachable(o, subjects.Current);
47+
});
48+
});
49+
}
50+
51+
internal Reachability Reachability1;
52+
internal Reachability Reachability2;
53+
internal Reachability Reachability3;
54+
internal HashSet<UniqueAddress> Allowed;
55+
56+
[GlobalSetup]
57+
public void Setup()
58+
{
59+
Reachability1 = CreateReachabilityOfSize(Reachability.Empty, NodesSize);
60+
Reachability2 = CreateReachabilityOfSize(Reachability1, NodesSize);
61+
Reachability3 = AddUnreachable(Reachability1, NodesSize / 2);
62+
Allowed = Reachability1.Versions.Keys.ToHashSet();
63+
}
64+
65+
private void CheckThunkFor(Reachability r1, Reachability r2, Action<Reachability, Reachability> thunk,
66+
int times)
67+
{
68+
for (var i = 0; i < times; i++)
69+
thunk(new Reachability(r1.Records, r1.Versions), new Reachability(r2.Records, r2.Versions));
70+
}
71+
72+
private void CheckThunkFor(Reachability r1, Action<Reachability> thunk, int times)
73+
{
74+
for (var i = 0; i < times; i++)
75+
thunk(new Reachability(r1.Records, r1.Versions));
76+
}
77+
78+
private void Merge(Reachability r1, Reachability r2, int expectedRecords)
79+
{
80+
r1.Merge(Allowed, r2).Records.Count.Should().Be(expectedRecords);
81+
}
82+
83+
private void CheckStatus(Reachability r1)
84+
{
85+
var record = r1.Records.First();
86+
r1.Status(record.Observer, record.Subject).Should().Be(record.Status);
87+
}
88+
89+
private void CheckAggregatedStatus(Reachability r1)
90+
{
91+
var record = r1.Records.First();
92+
r1.Status(record.Subject).Should().Be(record.Status);
93+
}
94+
95+
private void AllUnreachableOrTerminated(Reachability r1)
96+
{
97+
r1.AllUnreachableOrTerminated.IsEmpty.Should().BeFalse();
98+
}
99+
100+
private void AllUnreachable(Reachability r1)
101+
{
102+
r1.AllUnreachable.IsEmpty.Should().BeFalse();
103+
}
104+
105+
private void RecordsFrom(Reachability r1)
106+
{
107+
foreach (var o in r1.AllObservers)
108+
{
109+
r1.RecordsFrom(o).Should().NotBeNull();
110+
}
111+
}
112+
113+
[Benchmark]
114+
public void Reachability_must_merge_with_same_versions()
115+
{
116+
CheckThunkFor(Reachability1, Reachability1, (r1, r2) => Merge(r1, r2, 0), Iterations);
117+
}
118+
119+
[Benchmark]
120+
public void Reachability_must_merge_with_all_older_versions()
121+
{
122+
CheckThunkFor(Reachability2, Reachability1, (r1, r2) => Merge(r1, r2, 0), Iterations);
123+
}
124+
125+
[Benchmark]
126+
public void Reachability_must_merge_with_all_newer_versions()
127+
{
128+
CheckThunkFor(Reachability1, Reachability2, (r1, r2) => Merge(r1, r2, 0), Iterations);
129+
}
130+
131+
[Benchmark]
132+
public void Reachability_must_merge_with_half_nodes_unreachable()
133+
{
134+
CheckThunkFor(Reachability1, Reachability3, (r1, r2) => Merge(r1, r2, 5* NodesSize/2), Iterations);
135+
}
136+
137+
[Benchmark]
138+
public void Reachability_must_merge_with_half_nodes_unreachable_opposite()
139+
{
140+
CheckThunkFor(Reachability3, Reachability1, (r1, r2) => Merge(r1, r2, 5 * NodesSize / 2), Iterations);
141+
}
142+
143+
[Benchmark]
144+
public void Reachability_must_check_status_with_half_nodes_unreachable()
145+
{
146+
CheckThunkFor(Reachability3, CheckAggregatedStatus, Iterations);
147+
}
148+
149+
[Benchmark]
150+
public void Reachability_must_check_AllUnreachableOrTerminated_with_half_nodes_unreachable()
151+
{
152+
CheckThunkFor(Reachability3, AllUnreachableOrTerminated, Iterations);
153+
}
154+
155+
[Benchmark]
156+
public void Reachability_must_check_AllUnreachable_with_half_nodes_unreachable()
157+
{
158+
CheckThunkFor(Reachability3, AllUnreachable, Iterations);
159+
}
160+
161+
[Benchmark]
162+
public void Reachability_must_check_RecordsFrom_with_half_nodes_unreachable()
163+
{
164+
CheckThunkFor(Reachability3, RecordsFrom, Iterations);
165+
}
166+
}
167+
}

src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
2+
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Benchmarks")]
23
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Metrics")]
34
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")]
45
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]

src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,14 @@ private FailureDetectorStub Fd(ClusterHeartbeatSenderState state, UniqueAddress
9797
public void ClusterHeartbeatSenderState_must_return_empty_active_set_when_no_nodes()
9898
{
9999
_emptyState
100-
.ActiveReceivers.IsEmpty.Should().BeTrue();
100+
.ActiveReceivers.Count.Should().Be(0);
101101
}
102102

103103
[Fact]
104104
public void ClusterHeartbeatSenderState_must_init_with_empty()
105105
{
106106
_emptyState.Init(ImmutableHashSet<UniqueAddress>.Empty, ImmutableHashSet<UniqueAddress>.Empty)
107-
.ActiveReceivers.IsEmpty.Should().BeTrue();
107+
.ActiveReceivers.Count.Should().Be(0);
108108
}
109109

110110
[Fact]

src/core/Akka.Cluster/ClusterHeartbeat.cs

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ public ClusterHeartbeatSenderState(HeartbeatNodeRing ring, ImmutableHashSet<Uniq
429429
/// <summary>
430430
/// TBD
431431
/// </summary>
432-
public readonly ImmutableHashSet<UniqueAddress> ActiveReceivers;
432+
public readonly IImmutableSet<UniqueAddress> ActiveReceivers;
433433

434434
/// <summary>
435435
/// TBD
@@ -569,6 +569,7 @@ public ClusterHeartbeatSenderState Copy(HeartbeatNodeRing ring = null, Immutable
569569
internal sealed class HeartbeatNodeRing
570570
{
571571
private readonly bool _useAllAsReceivers;
572+
private Option<IImmutableSet<UniqueAddress>> _myReceivers;
572573

573574
/// <summary>
574575
/// TBD
@@ -588,14 +589,15 @@ public HeartbeatNodeRing(
588589
{
589590
SelfAddress = selfAddress;
590591
Nodes = nodes;
592+
NodeRing = nodes.ToImmutableSortedSet(RingComparer.Instance);
591593
Unreachable = unreachable;
592594
MonitoredByNumberOfNodes = monitoredByNumberOfNodes;
593595

594596
if (!nodes.Contains(selfAddress))
595597
throw new ArgumentException($"Nodes [${string.Join(", ", nodes)}] must contain selfAddress [{selfAddress}]");
596598

597-
_useAllAsReceivers = MonitoredByNumberOfNodes >= (NodeRing().Count - 1);
598-
MyReceivers = new Lazy<ImmutableHashSet<UniqueAddress>>(() => Receivers(SelfAddress));
599+
_useAllAsReceivers = MonitoredByNumberOfNodes >= (NodeRing.Count - 1);
600+
_myReceivers = Option<IImmutableSet<UniqueAddress>>.None;
599601
}
600602

601603
/// <summary>
@@ -618,26 +620,34 @@ public HeartbeatNodeRing(
618620
/// </summary>
619621
public int MonitoredByNumberOfNodes { get; }
620622

621-
private ImmutableSortedSet<UniqueAddress> NodeRing()
622-
{
623-
return Nodes.ToImmutableSortedSet(RingComparer.Instance);
624-
}
623+
public ImmutableSortedSet<UniqueAddress> NodeRing { get; }
625624

626625
/// <summary>
627626
/// Receivers for <see cref="SelfAddress"/>. Cached for subsequent access.
628627
/// </summary>
629-
public readonly Lazy<ImmutableHashSet<UniqueAddress>> MyReceivers;
628+
public Option<IImmutableSet<UniqueAddress>> MyReceivers
629+
{
630+
get
631+
{
632+
if (_myReceivers.IsEmpty)
633+
{
634+
_myReceivers = new Option<IImmutableSet<UniqueAddress>>(Receivers(SelfAddress));
635+
}
636+
637+
return _myReceivers;
638+
}
639+
}
630640

631641
/// <summary>
632-
/// TBD
642+
/// The set of Akka.Cluster nodes designated for receiving heartbeats from this node.
633643
/// </summary>
634-
/// <param name="sender">TBD</param>
635-
/// <returns>TBD</returns>
636-
public ImmutableHashSet<UniqueAddress> Receivers(UniqueAddress sender)
644+
/// <param name="sender">The node sending heartbeats.</param>
645+
/// <returns>An organized ring of unique nodes.</returns>
646+
public IImmutableSet<UniqueAddress> Receivers(UniqueAddress sender)
637647
{
638648
if (_useAllAsReceivers)
639649
{
640-
return NodeRing().Remove(sender).ToImmutableHashSet();
650+
return NodeRing.Remove(sender);
641651
}
642652
else
643653
{
@@ -646,8 +656,7 @@ public ImmutableHashSet<UniqueAddress> Receivers(UniqueAddress sender)
646656
// The reason for not limiting it to strictly monitoredByNrOfMembers is that the leader must
647657
// be able to continue its duties (e.g. removal of downed nodes) when many nodes are shutdown
648658
// at the same time and nobody in the remaining cluster is monitoring some of the shutdown nodes.
649-
Func<int, IEnumerator<UniqueAddress>, ImmutableSortedSet<UniqueAddress>, (int, ImmutableSortedSet<UniqueAddress>)> take = null;
650-
take = (n, iter, acc) =>
659+
(int, ImmutableSortedSet<UniqueAddress>) Take(int n, IEnumerator<UniqueAddress> iter, ImmutableSortedSet<UniqueAddress> acc)
651660
{
652661
if (iter.MoveNext() == false || n == 0)
653662
{
@@ -660,26 +669,26 @@ public ImmutableHashSet<UniqueAddress> Receivers(UniqueAddress sender)
660669
var isUnreachable = Unreachable.Contains(next);
661670
if (isUnreachable && acc.Count >= MonitoredByNumberOfNodes)
662671
{
663-
return take(n, iter, acc); // skip the unreachable, since we have already picked `MonitoredByNumberOfNodes`
672+
return Take(n, iter, acc); // skip the unreachable, since we have already picked `MonitoredByNumberOfNodes`
664673
}
665674
else if (isUnreachable)
666675
{
667-
return take(n, iter, acc.Add(next)); // include the unreachable, but don't count it
676+
return Take(n, iter, acc.Add(next)); // include the unreachable, but don't count it
668677
}
669678
else
670679
{
671-
return take(n - 1, iter, acc.Add(next)); // include the reachable
680+
return Take(n - 1, iter, acc.Add(next)); // include the reachable
672681
}
673682
}
674-
};
683+
}
675684

676-
var (remaining, slice1) = take(MonitoredByNumberOfNodes, NodeRing().From(sender).Skip(1).GetEnumerator(), ImmutableSortedSet<UniqueAddress>.Empty);
685+
var (remaining, slice1) = Take(MonitoredByNumberOfNodes, NodeRing.From(sender).Skip(1).GetEnumerator(), ImmutableSortedSet<UniqueAddress>.Empty);
677686

678687
IImmutableSet<UniqueAddress> slice = remaining == 0
679688
? slice1 // or, wrap-around
680-
: take(remaining, NodeRing().TakeWhile(x => x != sender).GetEnumerator(), slice1).Item2;
689+
: Take(remaining, NodeRing.TakeWhile(x => x != sender).GetEnumerator(), slice1).Item2;
681690

682-
return slice.ToImmutableHashSet();
691+
return slice;
683692
}
684693
}
685694

@@ -697,7 +706,7 @@ public HeartbeatNodeRing Copy(UniqueAddress selfAddress = null, ImmutableHashSet
697706
selfAddress ?? SelfAddress,
698707
nodes ?? Nodes,
699708
unreachable ?? Unreachable,
700-
monitoredByNumberOfNodes.HasValue ? monitoredByNumberOfNodes.Value : MonitoredByNumberOfNodes);
709+
monitoredByNumberOfNodes ?? MonitoredByNumberOfNodes);
701710
}
702711

703712
#region Operators

src/core/Akka.Cluster/Properties/AssemblyInfo.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests.MultiNode")]
2424
[assembly: InternalsVisibleTo("Akka.Cluster.Metrics")]
2525
[assembly: InternalsVisibleTo("Akka.DistributedData")]
26+
[assembly: InternalsVisibleTo("Akka.Benchmarks")]
2627

2728
// Setting ComVisible to false makes the types in this assembly not visible
2829
// to COM components. If you need to access a type in this assembly from

0 commit comments

Comments
 (0)