9
9
using System . Linq ;
10
10
using System . Threading . Tasks ;
11
11
using Akka . Actor ;
12
- using Akka . Remote . Serialization ;
13
12
using Akka . Remote . Transport ;
14
13
using Akka . TestKit ;
14
+ using Akka . TestKit . Extensions ;
15
15
using Google . Protobuf ;
16
16
using Xunit ;
17
17
18
18
namespace Akka . Remote . Tests . Transport
19
19
{
20
20
public abstract class GenericTransportSpec : AkkaSpec
21
21
{
22
- private Address addressATest = new Address ( "test" , "testsytemA" , "testhostA" , 4321 ) ;
23
- private Address addressBTest = new Address ( "test" , "testsytemB" , "testhostB" , 5432 ) ;
22
+ private readonly Address _addressATest = new Address ( "test" , "testsytemA" , "testhostA" , 4321 ) ;
23
+ private readonly Address _addressBTest = new Address ( "test" , "testsytemB" , "testhostB" , 5432 ) ;
24
24
25
- private Address addressA ;
26
- private Address addressB ;
27
- private Address nonExistingAddress ;
28
- private bool withAkkaProtocol ;
25
+ private Address _addressA ;
26
+ private Address _addressB ;
27
+ private Address _nonExistingAddress ;
28
+ private readonly bool _withAkkaProtocol ;
29
29
30
- public GenericTransportSpec ( bool withAkkaProtocol = false )
30
+ protected GenericTransportSpec ( bool withAkkaProtocol = false )
31
31
: base ( "akka.actor.provider = \" Akka.Remote.RemoteActorRefProvider, Akka.Remote\" " )
32
32
{
33
- this . withAkkaProtocol = withAkkaProtocol ;
33
+ _withAkkaProtocol = withAkkaProtocol ;
34
+ }
34
35
35
- addressA = addressATest . WithProtocol ( string . Format ( "{0}.{1}" , SchemeIdentifier , addressATest . Protocol ) ) ;
36
- addressB = addressBTest . WithProtocol ( string . Format ( "{0}.{1}" , SchemeIdentifier , addressBTest . Protocol ) ) ;
37
- nonExistingAddress = new Address ( SchemeIdentifier + ".test" , "nosystem" , "nohost" , 0 ) ;
36
+ public override async Task InitializeAsync ( )
37
+ {
38
+ await base . InitializeAsync ( ) ;
39
+
40
+ _addressA = _addressATest . WithProtocol ( $ "{ SchemeIdentifier } .{ _addressATest . Protocol } ") ;
41
+ _addressB = _addressBTest . WithProtocol ( $ "{ SchemeIdentifier } .{ _addressBTest . Protocol } ") ;
42
+ _nonExistingAddress = new Address ( SchemeIdentifier + ".test" , "nosystem" , "nohost" , 0 ) ;
38
43
}
39
44
40
45
private TimeSpan DefaultTimeout { get { return Dilated ( TestKitSettings . DefaultTimeout ) ; } }
@@ -45,7 +50,7 @@ public GenericTransportSpec(bool withAkkaProtocol = false)
45
50
46
51
private Akka . Remote . Transport . Transport WrapTransport ( Akka . Remote . Transport . Transport transport )
47
52
{
48
- if ( withAkkaProtocol ) {
53
+ if ( _withAkkaProtocol ) {
49
54
var provider = ( RemoteActorRefProvider ) ( ( ExtendedActorSystem ) Sys ) . Provider ;
50
55
51
56
return new AkkaProtocolTransport ( transport , Sys , new AkkaProtocolSettings ( provider . RemoteSettings . Config ) , new AkkaPduProtobuffCodec ( Sys ) ) ;
@@ -56,165 +61,153 @@ private Akka.Remote.Transport.Transport WrapTransport(Akka.Remote.Transport.Tran
56
61
57
62
private Akka . Remote . Transport . Transport NewTransportA ( AssociationRegistry registry )
58
63
{
59
- return WrapTransport ( FreshTransport ( new TestTransport ( addressATest , registry ) ) ) ;
64
+ return WrapTransport ( FreshTransport ( new TestTransport ( _addressATest , registry ) ) ) ;
60
65
}
61
66
62
67
private Akka . Remote . Transport . Transport NewTransportB ( AssociationRegistry registry )
63
68
{
64
- return WrapTransport ( FreshTransport ( new TestTransport ( addressBTest , registry ) ) ) ;
69
+ return WrapTransport ( FreshTransport ( new TestTransport ( _addressBTest , registry ) ) ) ;
65
70
}
66
71
67
72
[ Fact ]
68
- public void Transport_must_return_an_Address_and_promise_when_listen_is_called ( )
73
+ public async Task Transport_must_return_an_Address_and_promise_when_listen_is_called ( )
69
74
{
70
75
var registry = new AssociationRegistry ( ) ;
71
76
var transportA = NewTransportA ( registry ) ;
72
77
73
- var result = AwaitResult ( transportA . Listen ( ) ) ;
78
+ var result = await transportA . Listen ( ) . WithTimeout ( DefaultTimeout ) ;
74
79
75
- Assert . Equal ( addressA , result . Item1 ) ;
80
+ Assert . Equal ( _addressA , result . Item1 ) ;
76
81
Assert . NotNull ( result . Item2 ) ;
77
82
78
- Assert . Contains ( registry . LogSnapshot ( ) . OfType < ListenAttempt > ( ) , x => x . BoundAddress == addressATest ) ;
83
+ Assert . Contains ( registry . LogSnapshot ( ) . OfType < ListenAttempt > ( ) , x => x . BoundAddress == _addressATest ) ;
79
84
}
80
85
81
86
[ Fact ]
82
- public void Transport_must_associate_successfully_with_another_transport_of_its_kind ( )
87
+ public async Task Transport_must_associate_successfully_with_another_transport_of_its_kind ( )
83
88
{
84
89
var registry = new AssociationRegistry ( ) ;
85
90
var transportA = NewTransportA ( registry ) ;
86
91
var transportB = NewTransportB ( registry ) ;
87
92
88
93
// Must complete the returned promise to receive events
89
- AwaitResult ( transportA . Listen ( ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
90
- AwaitResult ( transportB . Listen ( ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
94
+ ( await transportA . Listen ( ) . WithTimeout ( DefaultTimeout ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
95
+ ( await transportB . Listen ( ) . WithTimeout ( DefaultTimeout ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
91
96
92
- AwaitCondition ( ( ) => registry . TransportsReady ( addressATest , addressBTest ) ) ;
97
+ await AwaitConditionAsync ( ( ) => registry . TransportsReady ( _addressATest , _addressBTest ) ) ;
93
98
94
- transportA . Associate ( addressB ) ;
95
- ExpectMsgPf ( DefaultTimeout , "Expect InboundAssociation from A" , o =>
99
+ // task is not awaited deliberately
100
+ var task = transportA . Associate ( _addressB ) ;
101
+ await ExpectMsgOfAsync ( DefaultTimeout , "Expect InboundAssociation from A" , o =>
96
102
{
97
- var inbound = o as InboundAssociation ;
98
-
99
- if ( inbound != null && inbound . Association . RemoteAddress == addressA )
103
+ if ( o is InboundAssociation inbound && inbound . Association . RemoteAddress == _addressA )
100
104
return inbound . Association ;
101
105
102
106
return null ;
103
107
} ) ;
104
108
105
- Assert . Contains ( registry . LogSnapshot ( ) . OfType < AssociateAttempt > ( ) , x => x . LocalAddress == addressATest && x . RemoteAddress == addressBTest ) ;
106
- AwaitCondition ( ( ) => registry . ExistsAssociation ( addressATest , addressBTest ) ) ;
109
+ Assert . Contains ( registry . LogSnapshot ( ) . OfType < AssociateAttempt > ( ) , x => x . LocalAddress == _addressATest && x . RemoteAddress == _addressBTest ) ;
110
+ await AwaitConditionAsync ( ( ) => registry . ExistsAssociation ( _addressATest , _addressBTest ) ) ;
107
111
}
108
112
109
113
[ Fact ]
110
- public void Transport_must_fail_to_associate_with_nonexisting_address ( )
114
+ public async Task Transport_must_fail_to_associate_with_non_existing_address ( )
111
115
{
112
116
var registry = new AssociationRegistry ( ) ;
113
117
var transportA = NewTransportA ( registry ) ;
114
118
115
- AwaitResult ( transportA . Listen ( ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
116
- AwaitCondition ( ( ) => registry . TransportsReady ( addressATest ) ) ;
119
+ ( await transportA . Listen ( ) . WithTimeout ( DefaultTimeout ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
120
+ await AwaitConditionAsync ( ( ) => registry . TransportsReady ( _addressATest ) ) ;
117
121
118
122
// Transport throws InvalidAssociationException when trying to associate with non-existing system
119
- XAssert . Throws < InvalidAssociationException > ( ( ) =>
120
- AwaitResult ( transportA . Associate ( nonExistingAddress ) )
123
+ await Assert . ThrowsAsync < InvalidAssociationException > ( async ( ) =>
124
+ await transportA . Associate ( _nonExistingAddress ) . WithTimeout ( DefaultTimeout )
121
125
) ;
122
126
}
123
127
124
128
[ Fact ]
125
- public void Transport_must_successfully_send_PDUs ( )
129
+ public async Task Transport_must_successfully_send_PDUs ( )
126
130
{
127
131
var registry = new AssociationRegistry ( ) ;
128
132
var transportA = NewTransportA ( registry ) ;
129
133
var transportB = NewTransportB ( registry ) ;
130
134
131
- AwaitResult ( transportA . Listen ( ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
132
- AwaitResult ( transportB . Listen ( ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
135
+ ( await transportA . Listen ( ) . WithTimeout ( DefaultTimeout ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
136
+ ( await transportB . Listen ( ) . WithTimeout ( DefaultTimeout ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
133
137
134
- AwaitCondition ( ( ) => registry . TransportsReady ( addressATest , addressBTest ) ) ;
138
+ await AwaitConditionAsync ( ( ) => registry . TransportsReady ( _addressATest , _addressBTest ) ) ;
135
139
136
- var associate = transportA . Associate ( addressB ) ;
137
- var handleB = ExpectMsgPf ( DefaultTimeout , "Expect InboundAssociation from A" , o =>
140
+ var associate = transportA . Associate ( _addressB ) ;
141
+ var handleB = await ExpectMsgOfAsync ( DefaultTimeout , "Expect InboundAssociation from A" , o =>
138
142
{
139
- var handle = o as InboundAssociation ;
140
- if ( handle != null && handle . Association . RemoteAddress == addressA )
143
+ if ( o is InboundAssociation handle && handle . Association . RemoteAddress == _addressA )
141
144
return handle . Association ;
142
145
143
146
return null ;
144
147
} ) ;
145
148
146
- var handleA = AwaitResult ( associate ) ;
149
+ var handleA = await associate . WithTimeout ( DefaultTimeout ) ;
147
150
148
151
// Initialize handles
149
152
handleA . ReadHandlerSource . SetResult ( new ActorHandleEventListener ( TestActor ) ) ;
150
153
handleB . ReadHandlerSource . SetResult ( new ActorHandleEventListener ( TestActor ) ) ;
151
154
152
155
var payload = ByteString . CopyFromUtf8 ( "PDU" ) ;
153
- var pdu = withAkkaProtocol ? new AkkaPduProtobuffCodec ( Sys ) . ConstructPayload ( payload ) : payload ;
156
+ var pdu = _withAkkaProtocol ? new AkkaPduProtobuffCodec ( Sys ) . ConstructPayload ( payload ) : payload ;
154
157
155
- AwaitCondition ( ( ) => registry . ExistsAssociation ( addressATest , addressBTest ) ) ;
158
+ await AwaitConditionAsync ( ( ) => registry . ExistsAssociation ( _addressATest , _addressBTest ) ) ;
156
159
157
160
handleA . Write ( payload ) ;
158
- ExpectMsgPf ( DefaultTimeout , "Expect InboundPayload from A" , o =>
161
+ await ExpectMsgOfAsync ( DefaultTimeout , "Expect InboundPayload from A" , o =>
159
162
{
160
- var inboundPayload = o as InboundPayload ;
161
-
162
- if ( inboundPayload != null && inboundPayload . Payload . Equals ( pdu ) )
163
+ if ( o is InboundPayload inboundPayload && inboundPayload . Payload . Equals ( pdu ) )
163
164
return inboundPayload . Payload ;
164
165
165
166
return null ;
166
167
} ) ;
167
168
168
- Assert . Contains ( registry . LogSnapshot ( ) . OfType < WriteAttempt > ( ) , x => x . Sender == addressATest && x . Recipient == addressBTest && x . Payload . Equals ( pdu ) ) ;
169
+ Assert . Contains ( registry . LogSnapshot ( ) . OfType < WriteAttempt > ( ) , x => x . Sender == _addressATest && x . Recipient == _addressBTest && x . Payload . Equals ( pdu ) ) ;
169
170
}
170
171
171
172
[ Fact ]
172
- public void Transport_must_successfully_disassociate ( )
173
+ public async Task Transport_must_successfully_disassociate ( )
173
174
{
174
175
var registry = new AssociationRegistry ( ) ;
175
176
var transportA = NewTransportA ( registry ) ;
176
177
var transportB = NewTransportB ( registry ) ;
177
178
178
- AwaitResult ( transportA . Listen ( ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
179
- AwaitResult ( transportB . Listen ( ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
179
+ ( await transportA . Listen ( ) . WithTimeout ( DefaultTimeout ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
180
+ ( await transportB . Listen ( ) . WithTimeout ( DefaultTimeout ) ) . Item2 . SetResult ( new ActorAssociationEventListener ( TestActor ) ) ;
180
181
181
- AwaitCondition ( ( ) => registry . TransportsReady ( addressATest , addressBTest ) ) ;
182
+ await AwaitConditionAsync ( ( ) => registry . TransportsReady ( _addressATest , _addressBTest ) ) ;
182
183
183
- var associate = transportA . Associate ( addressB ) ;
184
- var handleB = ExpectMsgPf ( DefaultTimeout , "Expect InboundAssociation from A" , o =>
184
+ var associate = transportA . Associate ( _addressB ) ;
185
+ var handleB = await ExpectMsgOfAsync ( DefaultTimeout , "Expect InboundAssociation from A" , o =>
185
186
{
186
- var handle = o as InboundAssociation ;
187
- if ( handle != null && handle . Association . RemoteAddress == addressA )
187
+ if ( o is InboundAssociation handle && handle . Association . RemoteAddress == _addressA )
188
188
return handle . Association ;
189
189
190
190
return null ;
191
191
} ) ;
192
192
193
- var handleA = AwaitResult ( associate ) ;
193
+ var handleA = await associate . WithTimeout ( DefaultTimeout ) ;
194
194
195
195
// Initialize handles
196
196
handleA . ReadHandlerSource . SetResult ( new ActorHandleEventListener ( TestActor ) ) ;
197
197
handleB . ReadHandlerSource . SetResult ( new ActorHandleEventListener ( TestActor ) ) ;
198
198
199
- AwaitCondition ( ( ) => registry . ExistsAssociation ( addressATest , addressBTest ) ) ;
199
+ await AwaitConditionAsync ( ( ) => registry . ExistsAssociation ( _addressATest , _addressBTest ) ) ;
200
200
201
- handleA . Disassociate ( ) ;
201
+ handleA . Disassociate ( "Disassociation test" , Log ) ;
202
202
203
- ExpectMsgPf ( DefaultTimeout , "Should receive Disassociated" , o => o as Disassociated ) ;
203
+ await ExpectMsgOfAsync ( DefaultTimeout , "Should receive Disassociated" , o => o as Disassociated ) ;
204
204
205
- AwaitCondition ( ( ) => ! registry . ExistsAssociation ( addressATest , addressBTest ) ) ;
205
+ await AwaitConditionAsync ( ( ) => ! registry . ExistsAssociation ( _addressATest , _addressBTest ) ) ;
206
206
207
- AwaitCondition ( ( ) =>
208
- registry . LogSnapshot ( ) . OfType < DisassociateAttempt > ( ) . Any ( x => x . Requestor == addressATest && x . Remote == addressBTest )
207
+ await AwaitConditionAsync ( ( ) =>
208
+ registry . LogSnapshot ( ) . OfType < DisassociateAttempt > ( ) . Any ( x => x . Requestor == _addressATest && x . Remote == _addressBTest )
209
209
) ;
210
210
}
211
-
212
- private T AwaitResult < T > ( Task < T > task )
213
- {
214
- task . Wait ( DefaultTimeout ) ;
215
-
216
- return task . Result ;
217
- }
218
211
}
219
212
}
220
213
0 commit comments