7
7
8
8
using System ;
9
9
using System . Collections . Immutable ;
10
+ using System . Linq ;
10
11
using Akka . Actor ;
12
+ using Akka . Cluster ;
11
13
using Akka . Cluster . TestKit ;
12
14
using Akka . Configuration ;
13
15
using Akka . Remote . TestKit ;
@@ -48,31 +50,41 @@ public class DurablePruningSpec : MultiNodeClusterSpec
48
50
private readonly GCounterKey keyA = new GCounterKey ( "A" ) ;
49
51
private readonly IActorRef replicator ;
50
52
51
- protected DurablePruningSpec ( ) : this ( new DurablePruningSpecConfig ( ) )
53
+ public DurablePruningSpec ( ) : this ( new DurablePruningSpecConfig ( ) )
52
54
{
53
55
}
54
56
55
57
protected DurablePruningSpec ( DurablePruningSpecConfig config ) : base ( config , typeof ( DurablePruningSpec ) )
56
58
{
57
- InitialParticipantsValueFactory = Roles . Count ;
58
59
cluster = Akka . Cluster . Cluster . Get ( Sys ) ;
60
+ replicator = StartReplicator ( Sys ) ;
59
61
timeout = Dilated ( TimeSpan . FromSeconds ( 5 ) ) ;
60
62
}
61
63
62
- protected override int InitialParticipantsValueFactory { get ; }
64
+ protected override int InitialParticipantsValueFactory => Roles . Count ;
63
65
64
- [ MultiNodeFact ( Skip = "FIXME" ) ]
66
+ [ MultiNodeFact ]
65
67
public void Pruning_of_durable_CRDT_should_move_data_from_removed_node ( )
66
68
{
67
69
Join ( first , first ) ;
68
70
Join ( second , first ) ;
69
71
70
72
var sys2 = ActorSystem . Create ( Sys . Name , Sys . Settings . Config ) ;
71
73
var cluster2 = Akka . Cluster . Cluster . Get ( sys2 ) ;
74
+ var distributedData2 = DistributedData . Get ( sys2 ) ;
72
75
var replicator2 = StartReplicator ( sys2 ) ;
73
76
var probe2 = new TestProbe ( sys2 , new XunitAssertions ( ) ) ;
74
77
cluster2 . Join ( Node ( first ) . Address ) ;
75
78
79
+ AwaitAssert ( ( ) =>
80
+ {
81
+ cluster . State . Members . Count . ShouldBe ( 4 ) ;
82
+ cluster . State . Members . All ( m => m . Status == MemberStatus . Up ) . ShouldBe ( true ) ;
83
+ cluster2 . State . Members . Count . ShouldBe ( 4 ) ;
84
+ cluster2 . State . Members . All ( m => m . Status == MemberStatus . Up ) . ShouldBe ( true ) ;
85
+ } , TimeSpan . FromSeconds ( 10 ) ) ;
86
+ EnterBarrier ( "joined" ) ;
87
+
76
88
Within ( TimeSpan . FromSeconds ( 5 ) , ( ) => AwaitAssert ( ( ) =>
77
89
{
78
90
replicator . Tell ( Dsl . GetReplicaCount ) ;
@@ -81,10 +93,10 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node()
81
93
probe2 . ExpectMsg ( new ReplicaCount ( 4 ) ) ;
82
94
} ) ) ;
83
95
84
- replicator . Tell ( Dsl . Update ( keyA , GCounter . Empty , WriteLocal . Instance , c => c . Increment ( cluster ) ) ) ;
96
+ replicator . Tell ( Dsl . Update ( keyA , GCounter . Empty , WriteLocal . Instance , c => c . Increment ( cluster , 3 ) ) ) ;
85
97
ExpectMsg ( new UpdateSuccess ( keyA , null ) ) ;
86
98
87
- replicator2 . Tell ( Dsl . Update ( keyA , GCounter . Empty , WriteLocal . Instance , c => c . Increment ( cluster2 , 2 ) ) , probe2 . Ref ) ;
99
+ replicator2 . Tell ( Dsl . Update ( keyA , GCounter . Empty , WriteLocal . Instance , c => c . Increment ( cluster2 . SelfUniqueAddress , 2 ) ) , probe2 . Ref ) ;
88
100
probe2 . ExpectMsg ( new UpdateSuccess ( keyA , null ) ) ;
89
101
90
102
EnterBarrier ( "updates-done" ) ;
@@ -135,8 +147,9 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node()
135
147
136
148
RunOn ( ( ) =>
137
149
{
138
- var addr = cluster2 . SelfAddress ;
139
- var sys3 = ActorSystem . Create ( Sys . Name , ConfigurationFactory . ParseString ( @"
150
+ var address = cluster2 . SelfAddress ;
151
+ var sys3 = ActorSystem . Create ( Sys . Name , ConfigurationFactory . ParseString ( $@ "
152
+ akka.remote.dot-netty.tcp.port = { address . Port }
140
153
" ) . WithFallback ( Sys . Settings . Config ) ) ;
141
154
var cluster3 = Akka . Cluster . Cluster . Get ( sys3 ) ;
142
155
var replicator3 = StartReplicator ( sys3 ) ;
@@ -151,20 +164,28 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node()
151
164
replicator3 . Tell ( Dsl . Get ( keyA , ReadLocal . Instance ) , probe3 . Ref ) ;
152
165
var counter4 = probe3 . ExpectMsg < GetSuccess > ( ) . Get ( keyA ) ;
153
166
var value = counter4 . Value ;
154
- values . Add ( ( int ) value ) ;
167
+ values = values . Add ( ( int ) value ) ;
155
168
value . ShouldBe ( 10UL ) ;
156
169
counter4 . State . Count . ShouldBe ( 3 ) ;
157
170
} ) ;
158
171
values . ShouldBe ( ImmutableHashSet . Create ( 10 ) ) ;
159
172
} ) ;
160
173
174
+ // all must at least have seen it as joining
175
+ AwaitAssert ( ( ) =>
176
+ {
177
+ cluster3 . State . Members . Count . ShouldBe ( 4 ) ;
178
+ cluster3 . State . Members . All ( m => m . Status == MemberStatus . Up ) . ShouldBeTrue ( ) ;
179
+ } , TimeSpan . FromSeconds ( 10 ) ) ;
180
+
161
181
// after merging with others
162
182
replicator3 . Tell ( Dsl . Get ( keyA , new ReadAll ( RemainingOrDefault ) ) ) ;
163
183
var counter5 = ExpectMsg < GetSuccess > ( ) . Get ( keyA ) ;
164
184
counter5 . Value . ShouldBe ( 10UL ) ;
165
185
counter5 . State . Count . ShouldBe ( 3 ) ;
166
186
167
187
} , first ) ;
188
+
168
189
EnterBarrier ( "sys3-started" ) ;
169
190
170
191
replicator . Tell ( Dsl . Get ( keyA , new ReadAll ( RemainingOrDefault ) ) ) ;
0 commit comments