6
6
//-----------------------------------------------------------------------
7
7
8
8
using System ;
9
+ using System . Threading . Tasks ;
9
10
using Akka . Actor ;
10
11
using Akka . Configuration ;
11
12
using Akka . MultiNode . TestAdapter ;
14
15
using Akka . TestKit ;
15
16
using FluentAssertions ;
16
17
17
- namespace Akka . Cluster . Sharding . Tests
18
+ namespace Akka . Cluster . Sharding . Tests ;
19
+
20
+ public class ClusterShardingSingleShardPerEntitySpecConfig : MultiNodeClusterShardingConfig
18
21
{
19
- public class ClusterShardingSingleShardPerEntitySpecConfig : MultiNodeClusterShardingConfig
20
- {
21
- public RoleName First { get ; }
22
- public RoleName Second { get ; }
23
- public RoleName Third { get ; }
24
- public RoleName Fourth { get ; }
25
- public RoleName Fifth { get ; }
22
+ public RoleName First { get ; }
23
+ public RoleName Second { get ; }
24
+ public RoleName Third { get ; }
25
+ public RoleName Fourth { get ; }
26
+ public RoleName Fifth { get ; }
26
27
27
- public Config R1Config { get ; }
28
- public Config R2Config { get ; }
28
+ public Config R1Config { get ; }
29
+ public Config R2Config { get ; }
29
30
30
- public ClusterShardingSingleShardPerEntitySpecConfig ( )
31
- : base ( loglevel : "DEBUG" , additionalConfig : @"
31
+ public ClusterShardingSingleShardPerEntitySpecConfig ( )
32
+ : base ( loglevel : "DEBUG" , additionalConfig : @"
32
33
akka.cluster.sharding.updating-state-timeout = 1s
33
34
" )
34
- {
35
- First = Role ( "first" ) ;
36
- Second = Role ( "second" ) ;
37
- Third = Role ( "third" ) ;
38
- Fourth = Role ( "fourth" ) ;
39
- Fifth = Role ( "fifth" ) ;
40
-
41
- TestTransport = true ;
42
- }
35
+ {
36
+ First = Role ( "first" ) ;
37
+ Second = Role ( "second" ) ;
38
+ Third = Role ( "third" ) ;
39
+ Fourth = Role ( "fourth" ) ;
40
+ Fifth = Role ( "fifth" ) ;
41
+
42
+ TestTransport = true ;
43
43
}
44
+ }
44
45
45
- public class ClusterShardingSingleShardPerEntitySpec : MultiNodeClusterShardingSpec < ClusterShardingSingleShardPerEntitySpecConfig >
46
- {
47
- #region setup
46
+ public class ClusterShardingSingleShardPerEntitySpec : MultiNodeClusterShardingSpec < ClusterShardingSingleShardPerEntitySpecConfig >
47
+ {
48
+ #region setup
48
49
49
- private readonly Lazy < IActorRef > _region ;
50
+ private readonly Lazy < IActorRef > _region ;
50
51
51
- public ClusterShardingSingleShardPerEntitySpec ( )
52
- : this ( new ClusterShardingSingleShardPerEntitySpecConfig ( ) , typeof ( ClusterShardingSingleShardPerEntitySpec ) )
53
- {
54
- }
52
+ public ClusterShardingSingleShardPerEntitySpec ( )
53
+ : this ( new ClusterShardingSingleShardPerEntitySpecConfig ( ) , typeof ( ClusterShardingSingleShardPerEntitySpec ) )
54
+ {
55
+ }
55
56
56
- protected ClusterShardingSingleShardPerEntitySpec ( ClusterShardingSingleShardPerEntitySpecConfig config , Type type )
57
- : base ( config , type )
58
- {
59
- _region = new Lazy < IActorRef > ( ( ) => ClusterSharding . Get ( Sys ) . ShardRegion ( "Entity" ) ) ;
60
- }
57
+ protected ClusterShardingSingleShardPerEntitySpec ( ClusterShardingSingleShardPerEntitySpecConfig config , Type type )
58
+ : base ( config , type )
59
+ {
60
+ _region = new Lazy < IActorRef > ( ( ) => ClusterSharding . Get ( Sys ) . ShardRegion ( "Entity" ) ) ;
61
+ }
61
62
62
- private void Join ( RoleName from , RoleName to )
63
- {
64
- Join (
65
- from ,
66
- to ,
67
- ( ) => StartSharding (
68
- Sys ,
69
- typeName : "Entity" ,
70
- entityProps : Props . Create ( ( ) => new ShardedEntity ( ) ) ) ) ;
71
- }
72
-
73
- private void JoinAndAllocate ( RoleName node , int entityId )
63
+ private Task JoinAsync ( RoleName from , RoleName to )
64
+ {
65
+ return JoinAsync (
66
+ from ,
67
+ to ,
68
+ ( ) => StartSharding (
69
+ Sys ,
70
+ typeName : "Entity" ,
71
+ entityProps : Props . Create ( ( ) => new ShardedEntity ( ) ) ) ) ;
72
+ }
73
+
74
+ private async Task JoinAndAllocate ( RoleName node , int entityId )
75
+ {
76
+ await WithinAsync ( TimeSpan . FromSeconds ( 10 ) , async ( ) =>
74
77
{
75
- Within ( TimeSpan . FromSeconds ( 10 ) , ( ) =>
78
+ await JoinAsync ( node , Config . First ) ;
79
+ await RunOnAsync ( async ( ) =>
76
80
{
77
- Join ( node , Config . First ) ;
78
- RunOn ( ( ) =>
79
- {
80
- _region . Value . Tell ( entityId ) ;
81
+ _region . Value . Tell ( entityId ) ;
81
82
82
- ExpectMsg ( entityId ) ;
83
+ await ExpectMsgAsync ( entityId ) ;
83
84
84
- LastSender . Path . Should ( ) . Be ( _region . Value . Path / $ "{ entityId } " / $ "{ entityId } ") ;
85
- } , node ) ;
86
- } ) ;
87
- EnterBarrier ( $ "started-{ entityId } ") ;
88
- }
85
+ LastSender . Path . Should ( ) . Be ( _region . Value . Path / $ "{ entityId } " / $ "{ entityId } ") ;
86
+ } , node ) ;
87
+ } ) ;
88
+ await EnterBarrierAsync ( $ "started-{ entityId } ") ;
89
+ }
89
90
90
91
91
- #endregion
92
+ #endregion
92
93
93
- [ MultiNodeFact ]
94
- public void Cluster_sharding_with_single_shard_per_entity_specs ( )
95
- {
96
- Cluster_sharding_with_single_shard_per_entity_must_use_specified_region ( ) ;
97
- }
94
+ [ MultiNodeFact ]
95
+ public async Task Cluster_sharding_with_single_shard_per_entity_specs ( )
96
+ {
97
+ await Cluster_sharding_with_single_shard_per_entity_must_use_specified_region ( ) ;
98
+ }
98
99
99
- private void Cluster_sharding_with_single_shard_per_entity_must_use_specified_region ( )
100
- {
101
- JoinAndAllocate ( Config . First , 1 ) ;
102
- JoinAndAllocate ( Config . Second , 2 ) ;
103
- JoinAndAllocate ( Config . Third , 3 ) ;
104
- JoinAndAllocate ( Config . Fourth , 4 ) ;
105
- JoinAndAllocate ( Config . Fifth , 5 ) ;
100
+ private async Task Cluster_sharding_with_single_shard_per_entity_must_use_specified_region ( )
101
+ {
102
+ await JoinAndAllocate ( Config . First , 1 ) ;
103
+ await JoinAndAllocate ( Config . Second , 2 ) ;
104
+ await JoinAndAllocate ( Config . Third , 3 ) ;
105
+ await JoinAndAllocate ( Config . Fourth , 4 ) ;
106
+ await JoinAndAllocate ( Config . Fifth , 5 ) ;
106
107
107
- RunOn ( ( ) =>
108
- {
109
- // coordinator is on 'first', blackhole 3 other means that it can't update with WriteMajority
110
- TestConductor . Blackhole ( Config . First , Config . Third , ThrottleTransportAdapter . Direction . Both ) . Wait ( ) ;
111
- TestConductor . Blackhole ( Config . First , Config . Fourth , ThrottleTransportAdapter . Direction . Both ) . Wait ( ) ;
112
- TestConductor . Blackhole ( Config . First , Config . Fifth , ThrottleTransportAdapter . Direction . Both ) . Wait ( ) ;
113
-
114
- // shard 6 not allocated yet and due to the blackhole it will not be completed
115
- _region . Value . Tell ( 6 ) ;
116
-
117
- // shard 1 location is know by 'first' region, not involving coordinator
118
- _region . Value . Tell ( 1 ) ;
119
- ExpectMsg ( 1 ) ;
120
-
121
- // shard 2 location not known at 'first' region yet, but coordinator is on 'first' and should
122
- // be able to answer GetShardHome even though previous request for shard 4 has not completed yet
123
- _region . Value . Tell ( 2 ) ;
124
- ExpectMsg ( 2 ) ;
125
- LastSender . Path . Should ( ) . Be ( Node ( Config . Second ) / "system" / "sharding" / "Entity" / "2" / "2" ) ;
126
-
127
- TestConductor . PassThrough ( Config . First , Config . Third , ThrottleTransportAdapter . Direction . Both ) . Wait ( ) ;
128
- TestConductor . PassThrough ( Config . First , Config . Fourth , ThrottleTransportAdapter . Direction . Both ) . Wait ( ) ;
129
- TestConductor . PassThrough ( Config . First , Config . Fifth , ThrottleTransportAdapter . Direction . Both ) . Wait ( ) ;
130
- ExpectMsg ( 6 , TimeSpan . FromSeconds ( 10 ) ) ;
131
- } , Config . First ) ;
132
-
133
- EnterBarrier ( "after-1" ) ;
134
- }
108
+ await RunOnAsync ( async ( ) =>
109
+ {
110
+ // coordinator is on 'first', blackhole 3 other means that it can't update with WriteMajority
111
+ await TestConductor . BlackholeAsync ( Config . First , Config . Third , ThrottleTransportAdapter . Direction . Both ) ;
112
+ await TestConductor . BlackholeAsync ( Config . First , Config . Fourth , ThrottleTransportAdapter . Direction . Both ) ;
113
+ await TestConductor . BlackholeAsync ( Config . First , Config . Fifth , ThrottleTransportAdapter . Direction . Both ) ;
114
+
115
+ // shard 6 not allocated yet and due to the blackhole it will not be completed
116
+ _region . Value . Tell ( 6 ) ;
117
+
118
+ // shard 1 location is know by 'first' region, not involving coordinator
119
+ _region . Value . Tell ( 1 ) ;
120
+ await ExpectMsgAsync ( 1 ) ;
121
+
122
+ // shard 2 location not known at 'first' region yet, but coordinator is on 'first' and should
123
+ // be able to answer GetShardHome even though previous request for shard 4 has not completed yet
124
+ _region . Value . Tell ( 2 ) ;
125
+ await ExpectMsgAsync ( 2 ) ;
126
+ LastSender . Path . Should ( ) . Be ( await NodeAsync ( Config . Second ) / "system" / "sharding" / "Entity" / "2" / "2" ) ;
127
+
128
+ await TestConductor . PassThroughAsync ( Config . First , Config . Third , ThrottleTransportAdapter . Direction . Both ) ;
129
+ await TestConductor . PassThroughAsync ( Config . First , Config . Fourth , ThrottleTransportAdapter . Direction . Both ) ;
130
+ await TestConductor . PassThroughAsync ( Config . First , Config . Fifth , ThrottleTransportAdapter . Direction . Both ) ;
131
+ await ExpectMsgAsync ( 6 , TimeSpan . FromSeconds ( 10 ) ) ;
132
+ } , Config . First ) ;
133
+
134
+ await EnterBarrierAsync ( "after-1" ) ;
135
135
}
136
- }
136
+ }
0 commit comments