Skip to content

Commit 0bb3ab3

Browse files
authored
Convert Akka.Streams.TestKit to async (#5793)
1 parent 98808df commit 0bb3ab3

File tree

11 files changed

+1384
-331
lines changed

11 files changed

+1384
-331
lines changed

src/core/Akka.Persistence.TCK/Query/CurrentEventsByPersistenceIdSpec.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public virtual void ReadJournal_CurrentEventsByPersistenceId_should_not_see_new_
7474
var probe = src.Select(x => x.Event).RunWith(this.SinkProbe<object>(), Materializer)
7575
.Request(2)
7676
.ExpectNext("f-1", "f-2")
77-
.ExpectNoMsg(TimeSpan.FromMilliseconds(100)) as TestSubscriber.Probe<object>;
77+
.ExpectNoMsg(TimeSpan.FromMilliseconds(100)).Probe as TestSubscriber.Probe<object>;
7878

7979
pref.Tell("f-4");
8080
ExpectMsg("f-4-done");

src/core/Akka.Persistence.TCK/Query/EventsByPersistenceIdSpec.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void ReadJournal_live_query_EventsByPersistenceId_should_find_new_events_
8181
var probe = src.Select(x => x.Event).RunWith(this.SinkProbe<object>(), Materializer)
8282
.Request(2)
8383
.ExpectNext("e-1", "e-2")
84-
.ExpectNoMsg(TimeSpan.FromMilliseconds(100)) as TestSubscriber.Probe<object>;
84+
.ExpectNoMsg(TimeSpan.FromMilliseconds(100)).Probe as TestSubscriber.Probe<object>;
8585

8686
pref.Tell("e-4");
8787
ExpectMsg("e-4-done");

src/core/Akka.Streams.TestKit/Akka.Streams.TestKit.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
<TargetFrameworks>$(NetStandardLibVersion)</TargetFrameworks>
88
<PackageTags>$(AkkaPackageTags);reactive;stream;testkit</PackageTags>
99
<GenerateDocumentationFile>true</GenerateDocumentationFile>
10+
<LangVersion>8.0</LangVersion>
1011
</PropertyGroup>
1112

1213
<ItemGroup>

src/core/Akka.Streams.TestKit/StreamTestKit.cs

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
//-----------------------------------------------------------------------
77

88
using System;
9+
using System.Threading;
10+
using System.Threading.Tasks;
911
using Akka.TestKit;
1012
using Akka.Actor;
1113
using Akka.Streams.Implementation;
@@ -76,30 +78,57 @@ public void Cancel()
7678
PublisherProbe.Ref.Tell(new TestPublisher.CancelSubscription(this));
7779
}
7880

79-
public void ExpectRequest(long n)
81+
public void ExpectRequest(long n, CancellationToken cancellationToken = default)
8082
{
81-
PublisherProbe.ExpectMsg<TestPublisher.RequestMore>(
82-
x => x.NrOfElements == n && Equals(x.Subscription, this));
83+
ExpectRequestAsync(n, cancellationToken)
84+
.ConfigureAwait(false).GetAwaiter().GetResult();
8385
}
8486

85-
public long ExpectRequest()
87+
public async Task ExpectRequestAsync(long n, CancellationToken cancellationToken = default)
8688
{
87-
return
88-
PublisherProbe.ExpectMsg<TestPublisher.RequestMore>(x => Equals(this, x.Subscription)).NrOfElements;
89+
await PublisherProbe.ExpectMsgAsync<TestPublisher.RequestMore>(
90+
isMessage: x => x.NrOfElements == n && Equals(x.Subscription, this),
91+
cancellationToken: cancellationToken)
92+
.ConfigureAwait(false);
8993
}
9094

91-
public void ExpectCancellation()
95+
public long ExpectRequest(CancellationToken cancellationToken = default)
9296
{
93-
PublisherProbe.FishForMessage(msg =>
94-
{
95-
if (msg is TestPublisher.CancelSubscription &&
96-
Equals(((TestPublisher.CancelSubscription) msg).Subscription, this)) return true;
97-
if (msg is TestPublisher.RequestMore && Equals(((TestPublisher.RequestMore) msg).Subscription, this))
98-
return false;
99-
return false;
100-
});
97+
return ExpectRequestAsync(cancellationToken)
98+
.ConfigureAwait(false).GetAwaiter().GetResult();
10199
}
102100

101+
public async Task<long> ExpectRequestAsync(CancellationToken cancellationToken = default)
102+
{
103+
var msg = await PublisherProbe.ExpectMsgAsync<TestPublisher.RequestMore>(
104+
isMessage: x => Equals(this, x.Subscription),
105+
cancellationToken: cancellationToken)
106+
.ConfigureAwait(false);
107+
return msg.NrOfElements;
108+
}
109+
110+
public void ExpectCancellation(CancellationToken cancellationToken = default)
111+
{
112+
ExpectCancellationAsync(cancellationToken)
113+
.ConfigureAwait(false).GetAwaiter().GetResult();
114+
}
115+
116+
public async Task ExpectCancellationAsync(CancellationToken cancellationToken = default)
117+
{
118+
await PublisherProbe.FishForMessageAsync(
119+
isMessage: msg =>
120+
{
121+
return msg switch
122+
{
123+
TestPublisher.CancelSubscription cancel when Equals(cancel.Subscription, this) => true,
124+
TestPublisher.RequestMore more when Equals(more.Subscription, this) => false,
125+
_ => false
126+
};
127+
},
128+
cancellationToken: cancellationToken)
129+
.ConfigureAwait(false);
130+
}
131+
103132
public void SendNext(T element) => Subscriber.OnNext(element);
104133

105134
public void SendComplete() => Subscriber.OnComplete();
@@ -112,15 +141,14 @@ public void ExpectCancellation()
112141
internal sealed class ProbeSource<T> : SourceModule<T, TestPublisher.Probe<T>>
113142
{
114143
private readonly TestKitBase _testKit;
115-
private readonly Attributes _attributes;
116144

117145
public ProbeSource(TestKitBase testKit, Attributes attributes, SourceShape<T> shape) : base(shape)
118146
{
119147
_testKit = testKit;
120-
_attributes = attributes;
148+
Attributes = attributes;
121149
}
122150

123-
public override Attributes Attributes => _attributes;
151+
public override Attributes Attributes { get; }
124152

125153
public override IModule WithAttributes(Attributes attributes)
126154
{
@@ -129,7 +157,7 @@ public override IModule WithAttributes(Attributes attributes)
129157

130158
protected override SourceModule<T, TestPublisher.Probe<T>> NewInstance(SourceShape<T> shape)
131159
{
132-
return new ProbeSource<T>(_testKit, _attributes, shape);
160+
return new ProbeSource<T>(_testKit, Attributes, shape);
133161
}
134162

135163
public override IPublisher<T> Create(MaterializationContext context, out TestPublisher.Probe<T> materializer)
@@ -142,15 +170,14 @@ public override IPublisher<T> Create(MaterializationContext context, out TestPub
142170
internal sealed class ProbeSink<T> : SinkModule<T, TestSubscriber.Probe<T>>
143171
{
144172
private readonly TestKitBase _testKit;
145-
private readonly Attributes _attributes;
146173

147174
public ProbeSink(TestKitBase testKit, Attributes attributes, SinkShape<T> shape) : base(shape)
148175
{
149176
_testKit = testKit;
150-
_attributes = attributes;
177+
Attributes = attributes;
151178
}
152179

153-
public override Attributes Attributes => _attributes;
180+
public override Attributes Attributes { get; }
154181

155182
public override IModule WithAttributes(Attributes attributes)
156183
{
@@ -159,7 +186,7 @@ public override IModule WithAttributes(Attributes attributes)
159186

160187
protected override SinkModule<T, TestSubscriber.Probe<T>> NewInstance(SinkShape<T> shape)
161188
{
162-
return new ProbeSink<T>(_testKit, _attributes, shape);
189+
return new ProbeSink<T>(_testKit, Attributes, shape);
163190
}
164191

165192
public override object Create(MaterializationContext context, out TestSubscriber.Probe<T> materializer)

0 commit comments

Comments
 (0)