Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 76 additions & 6 deletions src/contrib/cluster/Akka.DistributedData.Tests/ReplicatorSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Numerics;
using System.Text;
Expand All @@ -33,7 +34,11 @@ static ReplicatorSpecs()
SpecConfig = ConfigurationFactory.ParseString(@"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0")
akka.remote.dot-netty.tcp.port = 0
akka.remote.dot-netty.tcp.send-buffer-size = 2000000
akka.remote.dot-netty.tcp.receive-buffer-size = 2000000
akka.remote.dot-netty.tcp.maximum-frame-size = 1000000
akka.cluster.sharding.updating-state-timeout = 15s")
.WithFallback(DistributedData.DefaultConfig());
}

Expand Down Expand Up @@ -503,7 +508,7 @@ public async Task Bugfix_4367_ORMultiValueDictionary_WithValueDeltas_DeltaGroup_

// Scenario 1 - add 1 entry with multiple values to all nodes
var keyA = "A";
var entryA = ImmutableHashSet<string>.Empty.Add("1").Add("2");
var entryA = ImmutableHashSet<string>.Empty.Add("1").Add("2").Add("3").Add("4");
await AwaitAssertAsync(async () => {
var m1 = await _replicator1.Ask<UpdateSuccess>(Dsl.Update(
_keyJ,
Expand Down Expand Up @@ -539,7 +544,7 @@ await AwaitAssertAsync(async () =>
});

// Scenario 3 - modify set with existing items in it
var entryA1 = entryA.Add("4");
var entryA1 = entryA.Add("6");
ORMultiValueDictionary<string, string> node2EntriesBCA = null;
await AwaitAssertAsync(async () =>
{
Expand All @@ -551,10 +556,10 @@ await AwaitAssertAsync(async () =>
s => s.SetItems(Cluster.Cluster.Get(_sys1), keyA, entryA1)));

node2EntriesBCA = changedProbe2.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ)).Get(_keyJ);
node2EntriesBCA.Entries["A"].Should().BeEquivalentTo("1", "2", "4");
node2EntriesBCA.Entries["A"].Should().BeEquivalentTo("1", "2", "3", "4", "6");

var node3EntriesBCA = changedProbe3.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ)).Get(_keyJ).Entries;
node3EntriesBCA["A"].Should().BeEquivalentTo("1", "2", "4");
node3EntriesBCA["A"].Should().BeEquivalentTo("1", "2", "3", "4", "6");
});

// Trigger update from Node2 back to Node 1
Expand All @@ -570,11 +575,76 @@ await AwaitAssertAsync(async () =>
s => s.SetItems(Cluster.Cluster.Get(_sys2), keyA, entryA2)));

var node1EntriesBCA = changedProbe1.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ)).Get(_keyJ).Entries;
node1EntriesBCA["A"].Should().BeEquivalentTo("1", "2", "4", "5");
node1EntriesBCA["A"].Should().BeEquivalentTo("1", "2", "3", "4", "5", "6");
});
}

// Reproduction spec for issue #5663
[Fact]
public async Task ORMultiValueDictionary_WithValueDeltas_LargeDataSet()
{
await InitCluster();

var changedProbe2 = CreateTestProbe(_sys2);
_replicator2.Tell(Dsl.Subscribe(_keyJ, changedProbe2.Ref));

var changedProbe3 = CreateTestProbe(_sys3);
_replicator3.Tell(Dsl.Subscribe(_keyJ, changedProbe3.Ref));

var messages = Enumerable.Range(0, 20000).Select(i => i.ToString()).ToList();

// Scenario 1 - add 1 entry with multiple values to all nodes
var keyA = "A";
var entryA = messages.ToImmutableHashSet();

var stopwatch = Stopwatch.StartNew();
try
{
await _replicator1.Ask<UpdateSuccess>(Dsl.Update(
_keyJ,
ORMultiValueDictionary<string, string>.EmptyWithValueDeltas,
new WriteMajority(_timeOut),
s => s.SetItems(Cluster.Cluster.Get(_sys1), keyA, entryA)));
}
finally
{
stopwatch.Stop();
}
Log.Info($"Update time: {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / 1000.0} s)");

var node2EntriesA = changedProbe2.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ)).Get(_keyJ).Entries;
node2EntriesA[keyA].Should().BeEquivalentTo(entryA);

var node3EntriesA = changedProbe3.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ)).Get(_keyJ).Entries;
node3EntriesA[keyA].Should().BeEquivalentTo(entryA);

// Scenario 2 - modify set with existing items in it
var entryA1 = entryA.Add("999999").Add("1000000");

stopwatch = Stopwatch.StartNew();
try
{
await _replicator1.Ask<UpdateSuccess>(Dsl.Update(
_keyJ,
ORMultiValueDictionary<string, string>.EmptyWithValueDeltas,
new WriteMajority(_timeOut),
s => s.SetItems(Cluster.Cluster.Get(_sys1), keyA, entryA1)));
}
finally
{
stopwatch.Stop();
}
Log.Info($"Single update time: {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / 1000.0} s)");

var node2Changed = changedProbe2.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ), TimeSpan.FromSeconds(3));
var node2EntriesBCA = node2Changed.Get(_keyJ);
node2EntriesBCA.Entries["A"].Should().BeEquivalentTo(entryA1);

var node3Changed = changedProbe3.ExpectMsg<Changed>(g => Equals(g.Key, _keyJ), TimeSpan.FromSeconds(3));
var node3EntriesBCA = node3Changed.Get(_keyJ).Entries;
node3EntriesBCA["A"].Should().BeEquivalentTo(entryA1);
}

protected override void BeforeTermination()
{
Shutdown(_sys1);
Expand Down
6 changes: 3 additions & 3 deletions src/contrib/cluster/Akka.DistributedData/ORSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -613,12 +613,12 @@ public DeltaGroup(ImmutableArray<IReplicatedData> operations)

public IReplicatedData Merge(IReplicatedData other)
{
if (other is AddDeltaOperation)
if (other is AddDeltaOperation thatAdd)
{
// merge AddDeltaOp into last AddDeltaOp in the group, if possible
var last = Operations[Operations.Length - 1];
return last is AddDeltaOperation
? new DeltaGroup(Operations.SetItem(Operations.Length - 1, other.Merge(last)))
return last is AddDeltaOperation thisAdd
? new DeltaGroup(Operations.SetItem(Operations.Length - 1, thisAdd.Merge(thatAdd)))
: new DeltaGroup(Operations.Add(other));
}
else if (other is DeltaGroup @group)
Expand Down