Skip to content

Commit b58ee27

Browse files
ensure that all recovered Timestamps are done in Ticks format (#174)
* ensure that all recovered Timestamps are done in Ticks format * fixed timestamp conversion * updated comment
1 parent ac1da6a commit b58ee27

File tree

1 file changed

+22
-5
lines changed

1 file changed

+22
-5
lines changed

src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,25 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
347347
};
348348
}
349349

350+
private static long ToTicks(BsonTimestamp bson)
351+
{
352+
353+
354+
// BSON Timestamps are stored natively as Unix epoch seconds + an ordinal value
355+
356+
// need to use BsonTimestamp.Timestamp because the ordinal value doesn't actually have any
357+
// bearing on the time - it's used to try to somewhat order the events that all occurred concurrently
358+
// according to the MongoDb clock. No need to include that data in the EventEnvelope.Timestamp field
359+
// which is used entirely for end-user purposes.
360+
//
361+
// See https://docs.mongodb.com/manual/reference/bson-types/#timestamps
362+
363+
return DateTimeOffset.FromUnixTimeSeconds(bson.Timestamp).Ticks;
364+
}
365+
350366
private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sender)
351367
{
368+
352369
if (_settings.LegacySerialization)
353370
{
354371
var manifest = string.IsNullOrEmpty(entry.Manifest) ? entry.Payload.GetType().TypeQualifiedName() : entry.Manifest;
@@ -360,7 +377,7 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
360377
manifest,
361378
entry.IsDeleted,
362379
sender,
363-
timestamp: entry.Ordering.Timestamp);
380+
timestamp: ToTicks(entry.Ordering)); // MongoDb timestamps are stored as Unix Epoch
364381
}
365382

366383
var legacy = entry.SerializerId.HasValue || !string.IsNullOrEmpty(entry.Manifest);
@@ -373,7 +390,7 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
373390
// it the timestamp is not defined in the binary payload
374391
if (output.Timestamp == 0L)
375392
{
376-
output = (Persistent)output.WithTimestamp(entry.Ordering.Timestamp);
393+
output = (Persistent)output.WithTimestamp(ToTicks(entry.Ordering));
377394
}
378395

379396
return output;
@@ -402,14 +419,14 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
402419
}
403420

404421
if (deserialized is Persistent p)
405-
return (Persistent)p.WithTimestamp(entry.Ordering.Timestamp);
422+
return (Persistent)p.WithTimestamp(ToTicks(entry.Ordering));
406423

407-
return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender, timestamp: entry.Ordering.Timestamp);
424+
return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering));
408425
}
409426
else // backwards compat for object serialization - Payload was already deserialized by BSON
410427
{
411428
return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest,
412-
entry.IsDeleted, sender, timestamp: entry.Ordering.Timestamp);
429+
entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering));
413430
}
414431

415432
}

0 commit comments

Comments
 (0)