5
5
// </copyright>
6
6
//-----------------------------------------------------------------------
7
7
8
+ using System . Threading . Tasks ;
8
9
using Akka . Streams . Dsl ;
9
10
using Akka . TestKit ;
10
11
using FluentAssertions ;
@@ -24,55 +25,58 @@ public TestPublisherSubscriberSpec(ITestOutputHelper output = null) : base(outpu
24
25
}
25
26
26
27
[ Fact ]
27
- public void TestPublisher_and_TestSubscriber_should_have_all_events_accessible_from_manual_probes ( )
28
+ public async Task TestPublisher_and_TestSubscriber_should_have_all_events_accessible_from_manual_probes ( )
28
29
{
29
- this . AssertAllStagesStopped ( ( ) =>
30
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
30
31
{
31
32
var upstream = this . CreateManualPublisherProbe < int > ( ) ;
32
33
var downstream = this . CreateManualSubscriberProbe < int > ( ) ;
33
34
Source . FromPublisher ( upstream )
34
35
. RunWith ( Sink . AsPublisher < int > ( false ) , Materializer )
35
36
. Subscribe ( downstream ) ;
36
37
37
- var upstreamSubscription = upstream . ExpectSubscription ( ) ;
38
- object evt = downstream . ExpectEvent ( ) ;
38
+ var upstreamSubscription = await upstream . ExpectSubscriptionAsync ( ) ;
39
+ object evt = await downstream . ExpectEventAsync ( ) ;
39
40
evt . Should ( ) . BeOfType < TestSubscriber . OnSubscribe > ( ) ;
40
41
var downstreamSubscription = ( ( TestSubscriber . OnSubscribe ) evt ) . Subscription ;
41
42
42
43
upstreamSubscription . SendNext ( 1 ) ;
43
44
downstreamSubscription . Request ( 1 ) ;
44
- evt = upstream . ExpectEvent ( ) ;
45
+ evt = await upstream . ExpectEventAsync ( ) ;
45
46
evt . Should ( ) . BeOfType < TestPublisher . RequestMore > ( ) ;
46
47
( ( TestPublisher . RequestMore ) evt ) . NrOfElements . Should ( ) . Be ( 1 ) ;
47
- evt = downstream . ExpectEvent ( ) ;
48
+ evt = await downstream . ExpectEventAsync ( ) ;
48
49
evt . Should ( ) . BeOfType < TestSubscriber . OnNext < int > > ( ) ;
49
50
( ( TestSubscriber . OnNext < int > ) evt ) . Element . Should ( ) . Be ( 1 ) ;
50
51
51
52
upstreamSubscription . SendNext ( 1 ) ;
52
53
downstreamSubscription . Request ( 1 ) ;
53
- downstream . ExpectNext ( 1 ) ;
54
+ await downstream . ExpectNextAsync ( 1 ) . Task ;
54
55
55
56
upstreamSubscription . SendComplete ( ) ;
56
- evt = downstream . ExpectEvent ( ) ;
57
+ evt = await downstream . ExpectEventAsync ( ) ;
57
58
evt . Should ( ) . BeOfType < TestSubscriber . OnComplete > ( ) ;
58
59
} , Materializer ) ;
59
60
}
60
61
61
62
// "handle gracefully partial function that is not suitable" does not apply
62
63
63
64
[ Fact ]
64
- public void TestPublisher_and_TestSubscriber_should_properly_update_PendingRequest_in_ExpectRequest ( )
65
+ public async Task TestPublisher_and_TestSubscriber_should_properly_update_PendingRequest_in_ExpectRequest ( )
65
66
{
66
- var upstream = this . CreatePublisherProbe < int > ( ) ;
67
- var downstream = this . CreateSubscriberProbe < int > ( ) ;
67
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
68
+ {
69
+ var upstream = this . CreatePublisherProbe < int > ( ) ;
70
+ var downstream = this . CreateSubscriberProbe < int > ( ) ;
68
71
69
- Source . FromPublisher ( upstream ) . RunWith ( Sink . FromSubscriber ( downstream ) , Materializer ) ;
72
+ Source . FromPublisher ( upstream ) . RunWith ( Sink . FromSubscriber ( downstream ) , Materializer ) ;
70
73
71
- downstream . ExpectSubscription ( ) . Request ( 10 ) ;
74
+ ( await downstream . ExpectSubscriptionAsync ( ) ) . Request ( 10 ) ;
72
75
73
- upstream . ExpectRequest ( ) . Should ( ) . Be ( 10 ) ;
74
- upstream . SendNext ( 1 ) ;
75
- downstream . ExpectNext ( 1 ) ;
76
+ ( await upstream . ExpectRequestAsync ( ) ) . Should ( ) . Be ( 10 ) ;
77
+ upstream . SendNext ( 1 ) ;
78
+ await downstream . ExpectNextAsync ( 1 ) . Task ;
79
+ } , Materializer ) ;
76
80
}
77
81
}
78
82
}
0 commit comments