Skip to content

Commit a7955cb

Browse files
Fix race condition in AllEventsSpec by using async/await (#7748)
Closes #7711
1 parent 1c44ee3 commit a7955cb

File tree

1 file changed

+17
-15
lines changed

1 file changed

+17
-15
lines changed

src/core/Akka.Persistence.TCK/Query/AllEventsSpec.cs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System;
99
using System.Collections.Generic;
1010
using System.Text;
11+
using System.Threading.Tasks;
1112
using Akka.Actor;
1213
using Akka.Configuration;
1314
using Akka.Persistence.Query;
@@ -70,7 +71,7 @@ public virtual void ReadJournal_query_AllEvents_should_find_new_events()
7071
}
7172

7273
[Fact]
73-
public virtual void ReadJournal_query_AllEvents_should_find_events_from_offset_exclusive()
74+
public virtual async Task ReadJournal_query_AllEvents_should_find_events_from_offset_exclusive()
7475
{
7576
var queries = ReadJournal as IAllEventsQuery;
7677

@@ -79,38 +80,39 @@ public virtual void ReadJournal_query_AllEvents_should_find_events_from_offset_e
7980
var c = Sys.ActorOf(Query.TestActor.Props("c"));
8081

8182
a.Tell("keep");
82-
ExpectMsg("keep-done");
83+
await ExpectMsgAsync("keep-done");
8384
a.Tell("calm");
84-
ExpectMsg("calm-done");
85+
await ExpectMsgAsync("calm-done");
8586
b.Tell("and");
86-
ExpectMsg("and-done");
87+
await ExpectMsgAsync("and-done");
8788
a.Tell("keep");
88-
ExpectMsg("keep-done");
89+
await ExpectMsgAsync("keep-done");
8990
a.Tell("streaming");
90-
ExpectMsg("streaming-done");
91+
await ExpectMsgAsync("streaming-done");
9192

9293
var eventSrc1 = queries.AllEvents(NoOffset.Instance);
9394
var probe1 = eventSrc1.RunWith(this.SinkProbe<EventEnvelope>(), Materializer);
9495
probe1.Request(4);
95-
probe1.ExpectNext<EventEnvelope>(p => p.PersistenceId == "a" && p.SequenceNr == 1L && p.Event.Equals("keep"));
96-
probe1.ExpectNext<EventEnvelope>(p => p.PersistenceId == "a" && p.SequenceNr == 2L && p.Event.Equals("calm"));
97-
probe1.ExpectNext<EventEnvelope>(p => p.PersistenceId == "b" && p.SequenceNr == 1L && p.Event.Equals("and"));
98-
var offs = probe1.ExpectNext<EventEnvelope>(p => p.PersistenceId == "a" && p.SequenceNr == 3L && p.Event.Equals("keep")).Offset;
96+
await probe1.ExpectNextAsync<EventEnvelope>(p => p.PersistenceId == "a" && p.SequenceNr == 1L && p.Event.Equals("keep"));
97+
await probe1.ExpectNextAsync<EventEnvelope>(p => p.PersistenceId == "a" && p.SequenceNr == 2L && p.Event.Equals("calm"));
98+
await probe1.ExpectNextAsync<EventEnvelope>(p => p.PersistenceId == "b" && p.SequenceNr == 1L && p.Event.Equals("and"));
99+
var keepEvent = await probe1.ExpectNextAsync<EventEnvelope>(p => p.PersistenceId == "a" && p.SequenceNr == 3L && p.Event.Equals("keep"));
100+
var offs = keepEvent.Offset;
99101
probe1.Cancel();
100102

101103
var eventSrc2 = queries.AllEvents(offs);
102104
var probe2 = eventSrc2.RunWith(this.SinkProbe<EventEnvelope>(), Materializer);
103105
probe2.Request(10);
104106

105107
b.Tell("new");
106-
ExpectMsg("new-done");
108+
await ExpectMsgAsync("new-done");
107109
c.Tell("events");
108-
ExpectMsg("events-done");
110+
await ExpectMsgAsync("events-done");
109111

110112
// everything before "streaming" are not included, since exclusive offset
111-
probe2.ExpectNext<EventEnvelope>(p => p.PersistenceId == "a" && p.SequenceNr == 4L && p.Event.Equals("streaming"));
112-
probe2.ExpectNext<EventEnvelope>(p => p.PersistenceId == "b" && p.SequenceNr == 2L && p.Event.Equals("new"));
113-
probe2.ExpectNext<EventEnvelope>(p => p.PersistenceId == "c" && p.SequenceNr == 1L && p.Event.Equals("events"));
113+
await probe2.ExpectNextAsync<EventEnvelope>(p => p.PersistenceId == "a" && p.SequenceNr == 4L && p.Event.Equals("streaming"));
114+
await probe2.ExpectNextAsync<EventEnvelope>(p => p.PersistenceId == "b" && p.SequenceNr == 2L && p.Event.Equals("new"));
115+
await probe2.ExpectNextAsync<EventEnvelope>(p => p.PersistenceId == "c" && p.SequenceNr == 1L && p.Event.Equals("events"));
114116
probe2.Cancel();
115117
}
116118
}

0 commit comments

Comments
 (0)