12
12
using System . Threading ;
13
13
using System . Threading . Tasks ;
14
14
using Akka . TestKit . Internal ;
15
+ using Nito . AsyncEx . Synchronous ;
15
16
16
17
namespace Akka . TestKit
17
18
{
@@ -58,14 +59,22 @@ public T FishForMessage<T>(Predicate<T> isMessage, TimeSpan? max = null, string
58
59
/// <param name="allMessages">If null then will be ignored. If not null then will be initially cleared, then filled with all the messages until <paramref name="isMessage"/> returns <c>true</c></param>
59
60
/// <returns>Returns the message that <paramref name="isMessage"/> matched</returns>
60
61
public T FishForMessage < T > ( Predicate < T > isMessage , ArrayList allMessages , TimeSpan ? max = null , string hint = "" )
62
+ {
63
+ var task = FishForMessageAsync < T > ( isMessage , allMessages , max , hint ) . AsTask ( ) ;
64
+ task . WaitAndUnwrapException ( ) ;
65
+ return task . Result ;
66
+ }
67
+
68
+ /// <inheritdoc cref="FishForMessage{T}(Predicate{T}, ArrayList, TimeSpan?, string)"/>
69
+ public async ValueTask < T > FishForMessageAsync < T > ( Predicate < T > isMessage , ArrayList allMessages , TimeSpan ? max = null , string hint = "" )
61
70
{
62
71
var maxValue = RemainingOrDilated ( max ) ;
63
72
var end = Now + maxValue ;
64
73
allMessages ? . Clear ( ) ;
65
74
while ( true )
66
75
{
67
76
var left = end - Now ;
68
- var msg = ReceiveOne ( left ) ;
77
+ var msg = await ReceiveOneAsync ( left ) . ConfigureAwait ( false ) ;
69
78
_assertions . AssertTrue ( msg != null , "Timeout ({0}) during fishForMessage{1}" , maxValue , string . IsNullOrEmpty ( hint ) ? "" : ", hint: " + hint ) ;
70
79
if ( msg is T msg1 && isMessage ( msg1 ) )
71
80
{
@@ -138,9 +147,20 @@ public async Task<ArrayList> WaitForRadioSilenceAsync(TimeSpan? max = null, uint
138
147
/// <returns>The message if one was received; <c>null</c> otherwise</returns>
139
148
public object ReceiveOne ( TimeSpan ? max = null )
140
149
{
141
- MessageEnvelope envelope ;
142
- if ( TryReceiveOne ( out envelope , max , CancellationToken . None ) )
143
- return envelope . Message ;
150
+ var task = ReceiveOneAsync ( max ) . AsTask ( ) ;
151
+ task . WaitAndUnwrapException ( ) ;
152
+ var received = task . Result ;
153
+ return received ;
154
+ }
155
+
156
+ /// <inheritdoc cref="ReceiveOne(TimeSpan?)"/>
157
+ public async ValueTask < object > ReceiveOneAsync ( TimeSpan ? max = null )
158
+ {
159
+ var received = await TryReceiveOneAsync ( max , CancellationToken . None ) ;
160
+
161
+ if ( received . success )
162
+ return received . envelope . Message ;
163
+
144
164
return null ;
145
165
}
146
166
@@ -152,9 +172,19 @@ public object ReceiveOne(TimeSpan? max = null)
152
172
/// <returns>The message if one was received; <c>null</c> otherwise</returns>
153
173
public object ReceiveOne ( CancellationToken cancellationToken )
154
174
{
155
- MessageEnvelope envelope ;
156
- if ( TryReceiveOne ( out envelope , Timeout . InfiniteTimeSpan , cancellationToken ) )
157
- return envelope . Message ;
175
+ var task = ReceiveOneAsync ( cancellationToken ) . AsTask ( ) ;
176
+ task . WaitAndUnwrapException ( ) ;
177
+ var received = task . Result ;
178
+ return received ;
179
+ }
180
+ /// <inheritdoc cref="ReceiveOne(CancellationToken)"/>
181
+ public async ValueTask < object > ReceiveOneAsync ( CancellationToken cancellationToken )
182
+ {
183
+ var received = await TryReceiveOneAsync ( Timeout . InfiniteTimeSpan , cancellationToken ) ;
184
+
185
+ if ( received . success )
186
+ return received . envelope . Message ;
187
+
158
188
return null ;
159
189
}
160
190
@@ -177,6 +207,12 @@ public bool TryReceiveOne(out MessageEnvelope envelope, TimeSpan? max = null)
177
207
return TryReceiveOne ( out envelope , max , CancellationToken . None ) ;
178
208
}
179
209
210
+ /// <inheritdoc cref="TryReceiveOne(out MessageEnvelope, TimeSpan?)"/>
211
+ public async ValueTask < ( bool success , MessageEnvelope envelope ) > TryReceiveOneAsync ( TimeSpan ? max = null )
212
+ {
213
+ return await TryReceiveOneAsync ( max , CancellationToken . None ) . ConfigureAwait ( false ) ;
214
+ }
215
+
180
216
/// <summary>
181
217
/// Receive one message from the internal queue of the TestActor within
182
218
/// the specified duration.
@@ -200,10 +236,16 @@ public bool TryReceiveOne(out MessageEnvelope envelope, TimeSpan? max, Cancellat
200
236
return InternalTryReceiveOne ( out envelope , max , cancellationToken , true ) ;
201
237
}
202
238
239
+ /// <inheritdoc cref="TryReceiveOne(out MessageEnvelope, TimeSpan?, CancellationToken)"/>
240
+ public async ValueTask < ( bool success , MessageEnvelope envelope ) > TryReceiveOneAsync ( TimeSpan ? max , CancellationToken cancellationToken )
241
+ {
242
+ return await InternalTryReceiveOneAsync ( max , cancellationToken , true ) . ConfigureAwait ( false ) ;
243
+ }
244
+
203
245
private bool InternalTryReceiveOne ( out MessageEnvelope envelope , TimeSpan ? max , CancellationToken cancellationToken , bool shouldLog )
204
246
{
205
247
var task = InternalTryReceiveOneAsync ( max , cancellationToken , shouldLog ) . AsTask ( ) ;
206
- task . Wait ( ) ;
248
+ task . WaitAndUnwrapException ( ) ;
207
249
var received = task . Result ;
208
250
envelope = received . envelope ;
209
251
return received . success ;
@@ -268,8 +310,18 @@ private bool InternalTryReceiveOne(out MessageEnvelope envelope, TimeSpan? max,
268
310
/// <returns>The message if one was received; <c>null</c> otherwise</returns>
269
311
public object PeekOne ( TimeSpan ? max = null )
270
312
{
271
- if ( InternalTryPeekOne ( out var envelope , max , CancellationToken . None , true ) )
272
- return envelope . Message ;
313
+ var task = PeekOneAsync ( max ) . AsTask ( ) ;
314
+ task . WaitAndUnwrapException ( ) ;
315
+ var peeked = task . Result ;
316
+ return peeked ;
317
+ }
318
+
319
+ /// <inheritdoc cref="PeekOne(TimeSpan?)"/>
320
+ public async ValueTask < object > PeekOneAsync ( TimeSpan ? max = null )
321
+ {
322
+ var peeked = await TryPeekOneAsync ( max , CancellationToken . None ) ;
323
+ if ( peeked . success )
324
+ return peeked . envelope . Message ;
273
325
return null ;
274
326
}
275
327
@@ -281,8 +333,18 @@ public object PeekOne(TimeSpan? max = null)
281
333
/// <returns>The message if one was received; <c>null</c> otherwise</returns>
282
334
public object PeekOne ( CancellationToken cancellationToken )
283
335
{
284
- if ( InternalTryPeekOne ( out var envelope , Timeout . InfiniteTimeSpan , cancellationToken , true ) )
285
- return envelope . Message ;
336
+ var task = PeekOneAsync ( cancellationToken ) . AsTask ( ) ;
337
+ task . WaitAndUnwrapException ( ) ;
338
+ var peeked = task . Result ;
339
+ return peeked ;
340
+ }
341
+
342
+ /// <inheritdoc cref="PeekOne(CancellationToken)"/>
343
+ public async ValueTask < object > PeekOneAsync ( CancellationToken cancellationToken )
344
+ {
345
+ var peeked = await TryPeekOneAsync ( Timeout . InfiniteTimeSpan , cancellationToken ) ;
346
+ if ( peeked . success )
347
+ return peeked . envelope . Message ;
286
348
return null ;
287
349
}
288
350
@@ -304,6 +366,12 @@ public bool TryPeekOne(out MessageEnvelope envelope, TimeSpan? max = null)
304
366
return InternalTryPeekOne ( out envelope , max , CancellationToken . None , true ) ;
305
367
}
306
368
369
+ /// <inheritdoc cref="TryPeekOne(out MessageEnvelope, TimeSpan?)"/>
370
+ public async ValueTask < ( bool success , MessageEnvelope envelope ) > TryPeekOneAsync ( TimeSpan ? max = null )
371
+ {
372
+ return await InternalTryPeekOneAsync ( max , CancellationToken . None , true ) ;
373
+ }
374
+
307
375
/// <summary>
308
376
/// Peek one message from the head of the internal queue of the TestActor within
309
377
/// the specified duration.
@@ -327,10 +395,16 @@ public bool TryPeekOne(out MessageEnvelope envelope, TimeSpan? max, Cancellation
327
395
return InternalTryPeekOne ( out envelope , max , cancellationToken , true ) ;
328
396
}
329
397
398
+ /// <inheritdoc cref="TryPeekOne(out MessageEnvelope, TimeSpan?, CancellationToken)"/>
399
+ public async ValueTask < ( bool success , MessageEnvelope envelope ) > TryPeekOneAsync ( TimeSpan ? max , CancellationToken cancellationToken )
400
+ {
401
+ return await InternalTryPeekOneAsync ( max , cancellationToken , true ) ;
402
+ }
403
+
330
404
private bool InternalTryPeekOne ( out MessageEnvelope envelope , TimeSpan ? max , CancellationToken cancellationToken , bool shouldLog )
331
405
{
332
406
var task = InternalTryPeekOneAsync ( max , cancellationToken , shouldLog ) . AsTask ( ) ;
333
- task . Wait ( ) ;
407
+ task . WaitAndUnwrapException ( ) ;
334
408
var received = task . Result ;
335
409
envelope = received . envelope ;
336
410
return received . success ;
0 commit comments