Skip to content

Commit ac1da6a

Browse files
authored
Event envelope updates (#170)
* Bump AkkaVersion to 1.4.14 * Add FluentAssertionsVersion to Common.Props and set the version to 4.14.0 * Backwards compatibility for akkadotnet/akka.net#4680
1 parent 4bd8ca1 commit ac1da6a

File tree

9 files changed

+35
-19
lines changed

9 files changed

+35
-19
lines changed

src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
</PackageReference>
1414
<PackageReference Include="xunit" Version="$(XunitVersion)" />
1515
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
16-
<PackageReference Include="FluentAssertions" Version="5.10.3" />
16+
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
1717
<PackageReference Include="Mongo2Go" Version="2.2.16" />
1818
<PackageReference Include="System.Net.NetworkInformation" Version="4.3.0" />
1919
</ItemGroup>

src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public async Task Bug61_Events_Recovered_By_Id_Should_Match_Tag()
7979
public void Bug80_CurrentEventsByTag_should_Recover_until_end()
8080
{
8181
var actor = Sys.ActorOf(TagActor.Props("y"));
82-
var msgCount = 1200;
82+
//increased this to test for non-collision with the generated timestamps
83+
var msgCount = 5000;
8384
actor.Tell(msgCount);
8485
ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20));
8586

@@ -96,7 +97,8 @@ public void Bug80_CurrentEventsByTag_should_Recover_until_end()
9697
public void Bug80_AllEventsByTag_should_Recover_all_messages()
9798
{
9899
var actor = Sys.ActorOf(TagActor.Props("y"));
99-
var msgCount = 1200;
100+
//increased this to test for non-collision with the generated timestamps
101+
var msgCount = 5000;
100102
actor.Tell(msgCount);
101103
ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20));
102104

@@ -176,7 +178,7 @@ public object ToJournal(object evt)
176178
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
177179
{
178180
var specString = @"
179-
akka.test.single-expect-default = 3s
181+
akka.test.single-expect-default = 10s
180182
akka.persistence {
181183
publish-plugin-commands = on
182184
journal {

src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
namespace Akka.Persistence.MongoDb.Tests
1717
{
1818
[Collection("MongoDbSpec")]
19-
public class MongoDbCurrentEventsByPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
19+
public class MongoDbCurrentEventsByPersistenceIdsSpec : TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
2020
{
2121
public static readonly AtomicCounter Counter = new AtomicCounter(0);
2222
private readonly ITestOutputHelper _output;

src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public class JournalEntry
3535
[BsonElement("Manifest")]
3636
public string Manifest { get; set; }
3737

38-
3938
[BsonElement("Ordering")]
4039
public BsonTimestamp Ordering { get; set; }
4140

src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
317317
return new JournalEntry
318318
{
319319
Id = message.PersistenceId + "_" + message.SequenceNr,
320-
//Ordering = _sequenceRepository.GetSequenceValue("journalentry"),
321320
Ordering = new BsonTimestamp(0), // Auto-populates with timestamp
322-
//Timestamp = new BsonTimestamp(0),
323321
IsDeleted = message.IsDeleted,
324322
Payload = payload,
325323
PersistenceId = message.PersistenceId,
@@ -338,9 +336,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
338336
return new JournalEntry
339337
{
340338
Id = message.PersistenceId + "_" + message.SequenceNr,
341-
//Ordering = _sequenceRepository.GetSequenceValue("journalentry"),
342339
Ordering = new BsonTimestamp(0), // Auto-populates with timestamp
343-
//Timestamp = new BsonTimestamp(0),
344340
IsDeleted = message.IsDeleted,
345341
Payload = binary,
346342
PersistenceId = message.PersistenceId,
@@ -363,14 +359,24 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
363359
entry.PersistenceId,
364360
manifest,
365361
entry.IsDeleted,
366-
sender);
362+
sender,
363+
timestamp: entry.Ordering.Timestamp);
367364
}
368365

369366
var legacy = entry.SerializerId.HasValue || !string.IsNullOrEmpty(entry.Manifest);
370367
if (!legacy)
371368
{
372369
var ser = _serialization.FindSerializerForType(typeof(Persistent));
373-
return ser.FromBinary<Persistent>((byte[]) entry.Payload);
370+
var output = ser.FromBinary<Persistent>((byte[])entry.Payload);
371+
372+
// backwards compatibility for https://github.com/akkadotnet/akka.net/pull/4680
373+
// it the timestamp is not defined in the binary payload
374+
if (output.Timestamp == 0L)
375+
{
376+
output = (Persistent)output.WithTimestamp(entry.Ordering.Timestamp);
377+
}
378+
379+
return output;
374380
}
375381

376382
int? serializerId = null;
@@ -396,14 +402,14 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
396402
}
397403

398404
if (deserialized is Persistent p)
399-
return p;
405+
return (Persistent)p.WithTimestamp(entry.Ordering.Timestamp);
400406

401-
return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender);
407+
return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender, timestamp: entry.Ordering.Timestamp);
402408
}
403409
else // backwards compat for object serialization - Payload was already deserialized by BSON
404410
{
405411
return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest,
406-
entry.IsDeleted, sender);
412+
entry.IsDeleted, sender, timestamp: entry.Ordering.Timestamp);
407413
}
408414

409415
}

src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ protected bool Replaying(object message)
113113
offset: new Sequence(replayed.Offset),
114114
persistenceId: replayed.Persistent.PersistenceId,
115115
sequenceNr: replayed.Persistent.SequenceNr,
116+
timestamp: replayed.Persistent.Timestamp,
116117
@event: replayed.Persistent.Payload));
117118

118119
CurrentOffset = replayed.Offset;

src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ protected Receive Replaying(int limit)
131131
offset: new Sequence(seqNr),
132132
persistenceId: PersistenceId,
133133
sequenceNr: seqNr,
134+
timestamp: replayed.Persistent.Timestamp,
134135
@event: replayed.Persistent.Payload));
135136
CurrentSequenceNr = seqNr + 1;
136137
Buffer.DeliverBuffer(TotalDemand);

src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ protected Receive Replaying(int limit)
125125
offset: new Sequence(replayed.Offset),
126126
persistenceId: replayed.Persistent.PersistenceId,
127127
sequenceNr: replayed.Persistent.SequenceNr,
128+
timestamp: replayed.Persistent.Timestamp,
128129
@event: replayed.Persistent.Payload));
129130

130131
CurrentOffset = replayed.Offset;

src/common.props

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
<Project>
22
<PropertyGroup>
3-
<Copyright>Copyright © 2013-2020 Akka.NET Project</Copyright>
3+
<Copyright>Copyright © 2013-2021 Akka.NET Project</Copyright>
44
<Authors>Akka.NET Contrib</Authors>
5-
<VersionPrefix>1.4.1</VersionPrefix>
5+
<VersionPrefix>1.4.14</VersionPrefix>
66
<PackageIconUrl>http://getakka.net/images/akkalogo.png</PackageIconUrl>
77
<PackageProjectUrl>https://github.com/akkadotnet/Akka.Persistence.MongoDB</PackageProjectUrl>
88
<PackageLicenseUrl>https://github.com/akkadotnet/Akka.Persistence.MongoDB/blob/master/LICENSE.md</PackageLicenseUrl>
9-
<PackageReleaseNotes>Bump Akka version to 1.4.1</PackageReleaseNotes>
9+
<PackageReleaseNotes>
10+
Bump Akka version to 1.4.14
11+
Corrected `CurrentPersistentIds` query and `AllPersistentIds` queries to be more memory efficient and query entity ID data directly from Mongo
12+
Introduced `AllEvents` and `CurrentEvents` query to read the entire MongoDb journal
13+
Deprecated previous `GetMaxSeqNo` behavior - we no longer query the max sequence number directly from the journal AND the metadata collection. We only get that data directly from the metadata collection itself, which should make this query an O(1) operation rather than O(n)
14+
</PackageReleaseNotes>
1015
<GenerateDocumentationFile>true</GenerateDocumentationFile>
1116
<Description>Akka Persistence journal and snapshot store backed by MongoDB database.</Description>
1217
<NoWarn>$(NoWarn);CS1591</NoWarn>
1318
</PropertyGroup>
1419
<PropertyGroup>
1520
<XunitVersion>2.4.1</XunitVersion>
1621
<TestSdkVersion>16.8.3</TestSdkVersion>
17-
<AkkaVersion>1.4.12</AkkaVersion>
22+
<AkkaVersion>1.4.14</AkkaVersion>
23+
<FluentAssertionsVersion>4.14.0</FluentAssertionsVersion>
1824
</PropertyGroup>
1925
<!-- SourceLink support for all Akka.NET projects -->
2026
<ItemGroup>

0 commit comments

Comments
 (0)