Skip to content

Commit 5610f95

Browse files
authored
Persistence fixes (#4892)
* snapshot RecoveryTick ignored, part of akka/akka#20753 * lastSequenceNr should reflect the snapshot sequence and not start with 0 when journal is empty. Migrated from akka/akka#27496 * Enforce valid seqnr for deletes, migrated from akka/akka#25488 * api approval
1 parent d27df3d commit 5610f95

File tree

7 files changed

+59
-25
lines changed

7 files changed

+59
-25
lines changed

src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.approved.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ namespace Akka.Persistence
113113
public override int GetHashCode() { }
114114
public override string ToString() { }
115115
}
116-
public sealed class DeleteMessagesFailure : System.IEquatable<Akka.Persistence.DeleteMessagesFailure>
116+
public sealed class DeleteMessagesFailure : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable<Akka.Persistence.DeleteMessagesFailure>
117117
{
118118
public DeleteMessagesFailure(System.Exception cause, long toSequenceNr) { }
119119
public System.Exception Cause { get; }

src/core/Akka.Persistence.Tests/PersistentActorDeleteFailureSpec.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public void PersistentActor_should_have_default_warn_logging_be_triggered_when_d
115115
{
116116
var pref = Sys.ActorOf(Props.Create(() => new DoesNotHandleDeleteFailureActor(Name)));
117117
Sys.EventStream.Subscribe(TestActor, typeof (Warning));
118-
pref.Tell(new DeleteTo(100));
118+
pref.Tell(new DeleteTo(long.MaxValue));
119119
var message = ExpectMsg<Warning>().Message.ToString();
120120
message.Contains("Failed to DeleteMessages").ShouldBeTrue();
121121
message.Contains("Boom! Unable to delete events!").ShouldBeTrue();
@@ -126,8 +126,8 @@ public void PersistentActor_should_receive_a_DeleteMessagesFailure_when_deletion
126126
{
127127
var pref = Sys.ActorOf(Props.Create(() => new HandlesDeleteFailureActor(Name, TestActor)));
128128
Sys.EventStream.Subscribe(TestActor, typeof (Warning));
129-
pref.Tell(new DeleteTo(100));
130-
ExpectMsg<DeleteMessagesFailure>(m => m.ToSequenceNr == 100);
129+
pref.Tell(new DeleteTo(long.MaxValue));
130+
ExpectMsg<DeleteMessagesFailure>();
131131
ExpectNoMsg(TimeSpan.FromMilliseconds(100));
132132
}
133133
}

src/core/Akka.Persistence.Tests/PersistentActorSpec.Actors.cs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ protected bool Receiver(object message)
3939
throw new ArgumentNullException("Received DeleteMessagesSuccess without anyone asking for delete!");
4040
AskedForDelete.Tell(message);
4141
}
42+
else if (message is DeleteMessagesFailure)
43+
{
44+
if (AskedForDelete == null)
45+
throw new ArgumentNullException("Received DeleteMessagesSuccess without anyone asking for delete!");
46+
AskedForDelete.Tell(message);
47+
}
48+
4249
else return false;
4350
return true;
4451
}
@@ -144,7 +151,7 @@ protected bool UpdateState(object message)
144151
if (message is Evt)
145152
Events = Events.AddFirst((message as Evt).Data);
146153
else if (message is IActorRef)
147-
AskedForDelete = (IActorRef) message;
154+
AskedForDelete = (IActorRef)message;
148155
else
149156
return false;
150157
return true;
@@ -292,7 +299,7 @@ public ChangeBehaviorInCommandHandlerFirstActor(string name) : base(name) { }
292299
protected override bool ReceiveCommand(object message)
293300
{
294301
if (CommonBehavior(message)) return true;
295-
302+
296303
if (message is Cmd)
297304
{
298305
var cmd = message as Cmd;
@@ -324,7 +331,7 @@ public ChangeBehaviorInCommandHandlerLastActor(string name) : base(name) { }
324331
protected override bool ReceiveCommand(object message)
325332
{
326333
if (CommonBehavior(message)) return true;
327-
334+
328335
if (message is Cmd)
329336
{
330337
var cmd = message as Cmd;
@@ -607,7 +614,7 @@ protected override bool ReceiveCommand(object message)
607614
internal class AsyncPersistAndPersistMixedSyncAsyncActor : ExamplePersistentActor
608615
{
609616
private int _counter = 0;
610-
617+
611618
public AsyncPersistAndPersistMixedSyncAsyncActor(string name)
612619
: base(name)
613620
{
@@ -883,7 +890,7 @@ protected override bool ReceiveCommand(object message)
883890
{
884891
if (message is string)
885892
{
886-
var s = (string) message;
893+
var s = (string)message;
887894
_probe.Tell(s);
888895
Persist(s + "-outer-1", outer =>
889896
{
@@ -915,7 +922,7 @@ protected override bool ReceiveCommand(object message)
915922
{
916923
if (message is string)
917924
{
918-
var s = (string) message;
925+
var s = (string)message;
919926
_probe.Tell(s);
920927
PersistAsync(s + "-outer-1", outer =>
921928
{
@@ -947,7 +954,7 @@ protected override bool ReceiveCommand(object message)
947954
{
948955
if (message is string)
949956
{
950-
var s = (string) message;
957+
var s = (string)message;
951958
_probe.Tell(s);
952959
Persist(s + "-outer-1", outer =>
953960
{
@@ -979,7 +986,7 @@ protected override bool ReceiveCommand(object message)
979986
{
980987
if (message is string)
981988
{
982-
var s = (string) message;
989+
var s = (string)message;
983990
_probe.Tell(s);
984991
PersistAsync(s + "-outer-async-1", outer =>
985992
{
@@ -1011,7 +1018,7 @@ protected override bool ReceiveCommand(object message)
10111018
{
10121019
if (message is string)
10131020
{
1014-
var s = (string) message;
1021+
var s = (string)message;
10151022
_probe.Tell(s);
10161023
PersistAsync(s + "-outer-async", outer =>
10171024
{
@@ -1069,7 +1076,7 @@ protected override bool ReceiveCommand(object message)
10691076
{
10701077
if (message is string)
10711078
{
1072-
var s = (string) message;
1079+
var s = (string)message;
10731080
_probe.Tell(s);
10741081
Persist(s + "-1", WeMustGoDeeper);
10751082
return true;
@@ -1113,7 +1120,7 @@ protected override bool ReceiveCommand(object message)
11131120
{
11141121
if (message is string)
11151122
{
1116-
var s = (string) message;
1123+
var s = (string)message;
11171124
_probe.Tell(s);
11181125
PersistAsync(s + "-1", WeMustGoDeeper);
11191126
return true;

src/core/Akka.Persistence.Tests/PersistentActorSpec.cs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using System.Threading;
1212
using Akka.Actor;
1313
using Akka.TestKit;
14+
using FluentAssertions;
1415
using Xunit;
1516

1617
namespace Akka.Persistence.Tests
@@ -528,14 +529,14 @@ public void PersistentActor_should_allow_deeply_nested_PersistAsync_calls()
528529
var got = ReceiveN(nestedPersistAsyncs).Select(m => m.ToString()).OrderBy(m => m).ToArray();
529530
got.ShouldOnlyContainInOrder(Enumerable.Range(1, nestedPersistAsyncs).Select(i => "a-" + i).ToArray());
530531

531-
532+
532533
pref.Tell("b");
533534
pref.Tell("c");
534-
got = ReceiveN(nestedPersistAsyncs*2 + 2).Select(m => m.ToString()).OrderBy(m => m).ToArray();
535+
got = ReceiveN(nestedPersistAsyncs * 2 + 2).Select(m => m.ToString()).OrderBy(m => m).ToArray();
535536
got.ShouldOnlyContainInOrder(
536-
new [] {"b"}
537+
new[] { "b" }
537538
.Union(Enumerable.Range(1, nestedPersistAsyncs).Select(i => "b-" + i))
538-
.Union(new [] {"c"})
539+
.Union(new[] { "c" })
539540
.Union(Enumerable.Range(1, nestedPersistAsyncs).Select(i => "c-" + i))
540541
.ToArray());
541542
}
@@ -604,6 +605,20 @@ public void PersistentActor_should_be_able_to_delete_all_events()
604605
ExpectMsg<object[]>(m => m.Length == 0);
605606
}
606607

608+
[Fact]
609+
public void PersistentActor_should_not_be_able_to_delete_higher_seqnr_than_current()
610+
{
611+
var pref = ActorOf(Props.Create(() => new BehaviorOneActor(Name)));
612+
pref.Tell(new Cmd("b"));
613+
pref.Tell(GetState.Instance);
614+
ExpectMsgInOrder("a-1", "a-2", "b-1", "b-2");
615+
pref.Tell(new Delete(5)); // > current 4
616+
pref.Tell("boom"); // restart, recover
617+
ExpectMsg<DeleteMessagesFailure>(m => m.Cause.Message.Should().Contain("less than or equal to LastSequenceNr"));
618+
pref.Tell(GetState.Instance);
619+
ExpectMsgInOrder("a-1", "a-2", "b-1", "b-2");
620+
}
621+
607622
[Fact]
608623
public void PersistentActor_should_brecover_the_message_which_caused_the_restart()
609624
{
@@ -617,7 +632,7 @@ public void PersistentActor_should_be_able_to_persist_events_that_happen_during_
617632
{
618633
var persistentActor = ActorOf(Props.Create(() => new PersistInRecovery(Name)));
619634
persistentActor.Tell(GetState.Instance);
620-
ExpectAnyMsgInOrder(new[]{"a-1", "a-2", "rc-1", "rc-2" }, new[] { "a-1", "a-2", "rc-1", "rc-2", "rc-3" });
635+
ExpectAnyMsgInOrder(new[] { "a-1", "a-2", "rc-1", "rc-2" }, new[] { "a-1", "a-2", "rc-1", "rc-2", "rc-3" });
621636
persistentActor.Tell(new Cmd("invalid"));
622637
persistentActor.Tell(GetState.Instance);
623638
ExpectMsgInOrder("a-1", "a-2", "rc-1", "rc-2", "rc-3", "invalid");

src/core/Akka.Persistence/Eventsourced.Recovery.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,9 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
208208
case RecoverySuccess success:
209209
timeoutCancelable.Cancel();
210210
OnReplaySuccess();
211-
_sequenceNr = success.HighestSequenceNr;
212-
LastSequenceNr = success.HighestSequenceNr;
211+
var highestSeqNr = Math.Max(success.HighestSequenceNr, LastSequenceNr);
212+
_sequenceNr = highestSeqNr;
213+
LastSequenceNr = highestSeqNr;
213214
recoveryRunning = false;
214215
try
215216
{
@@ -255,6 +256,9 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
255256
eventSeenInInterval = false;
256257
}
257258
break;
259+
case RecoveryTick tick when tick.Snapshot:
260+
// snapshot tick, ignore
261+
break;
258262
default:
259263
StashInternally(message);
260264
break;

src/core/Akka.Persistence/Eventsourced.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,11 +448,19 @@ public void DeferAsync<TEvent>(TEvent evt, Action<TEvent> handler)
448448
/// Permanently deletes all persistent messages with sequence numbers less than or equal <paramref name="toSequenceNr"/>.
449449
/// If the delete is successful a <see cref="DeleteMessagesSuccess"/> will be sent to the actor.
450450
/// If the delete fails a <see cref="DeleteMessagesFailure"/> will be sent to the actor.
451+
///
452+
/// The given <paramref name="toSequenceNr"/> must be less than or equal to <see cref="Eventsourced.LastSequenceNr"/>, otherwise
453+
/// <see cref="DeleteMessagesFailure"/> is sent to the actor without performing the delete. All persistent
454+
/// messages may be deleted without specifying the actual sequence number by using <see cref="long.MaxValue"/>
455+
/// as the <paramref name="toSequenceNr"/>.
451456
/// </summary>
452457
/// <param name="toSequenceNr">Upper sequence number bound of persistent messages to be deleted.</param>
453458
public void DeleteMessages(long toSequenceNr)
454459
{
455-
Journal.Tell(new DeleteMessagesTo(PersistenceId, toSequenceNr, Self));
460+
if (toSequenceNr == long.MaxValue || toSequenceNr <= LastSequenceNr)
461+
Journal.Tell(new DeleteMessagesTo(PersistenceId, toSequenceNr == long.MaxValue ? LastSequenceNr : toSequenceNr, Self));
462+
else
463+
Self.Tell(new DeleteMessagesFailure(new InvalidOperationException($"toSequenceNr [{toSequenceNr}] must be less than or equal to LastSequenceNr [{LastSequenceNr}]"), toSequenceNr));
456464
}
457465

458466
/// <summary>

src/core/Akka.Persistence/JournalProtocol.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public bool Equals(DeleteMessagesSuccess other)
7070
/// Reply message to failed <see cref="Eventsourced.DeleteMessages"/> request.
7171
/// </summary>
7272
[Serializable]
73-
public sealed class DeleteMessagesFailure : IEquatable<DeleteMessagesFailure>
73+
public sealed class DeleteMessagesFailure : IEquatable<DeleteMessagesFailure>, INoSerializationVerificationNeeded //serialization verification temporary disabled because of Cause serialization issues
7474
{
7575
/// <summary>
7676
/// Initializes a new instance of the <see cref="DeleteMessagesFailure"/> class.
@@ -134,7 +134,7 @@ public sealed class DeleteMessagesTo : IJournalRequest, IEquatable<DeleteMessage
134134
/// Initializes a new instance of the <see cref="DeleteMessagesTo"/> class.
135135
/// </summary>
136136
/// <param name="persistenceId">Requesting persistent actor id.</param>
137-
/// <param name="toSequenceNr">Sequence number where replay should end (inclusive).</param>
137+
/// <param name="toSequenceNr">Sequence number where replay should end (inclusive). <see cref="long.MaxValue"/> may be used to delete all persistent messages.</param>
138138
/// <param name="persistentActor">Requesting persistent actor.</param>
139139
/// <exception cref="ArgumentNullException">
140140
/// This exception is thrown when the specified <paramref name="persistenceId"/> is undefined.

0 commit comments

Comments
 (0)