Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions src/core/Akka.Tests/Actor/DeadLetterSupressionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Xunit;
using FluentAssertions;
using FluentAssertions.Extensions;
using System.Threading.Tasks;

namespace Akka.Tests.Actor
{
Expand Down Expand Up @@ -45,7 +46,7 @@ public DeadLetterSupressionSpec()
}

[Fact]
public void Must_suppress_message_from_default_dead_letters_logging_sent_to_deadActor()
public async Task Must_suppress_message_from_default_dead_letters_logging_sent_to_deadActor()
{
var deadListener = CreateTestProbe();
Sys.EventStream.Subscribe(deadListener.Ref, typeof(DeadLetter));
Expand All @@ -59,46 +60,46 @@ public void Must_suppress_message_from_default_dead_letters_logging_sent_to_dead
deadActor.Tell(new SuppressedMessage());
deadActor.Tell(new NormalMessage());

var deadLetter = deadListener.ExpectMsg<DeadLetter>();
var deadLetter = await deadListener.ExpectMsgAsync<DeadLetter>();
deadLetter.Message.Should().BeOfType<NormalMessage>();
deadLetter.Sender.Should().Be(TestActor);
deadLetter.Recipient.Should().Be(deadActor);
deadListener.ExpectNoMsg(200.Milliseconds());
await deadListener.ExpectNoMsgAsync(200.Milliseconds());

var suppressedDeadLetter = suppressedListener.ExpectMsg<SuppressedDeadLetter>();
var suppressedDeadLetter = await suppressedListener.ExpectMsgAsync<SuppressedDeadLetter>();
suppressedDeadLetter.Message.Should().BeOfType<SuppressedMessage>();
suppressedDeadLetter.Sender.Should().Be(TestActor);
suppressedDeadLetter.Recipient.Should().Be(Sys.DeadLetters);
suppressedListener.ExpectNoMsg(200.Milliseconds());
await suppressedListener.ExpectNoMsgAsync(200.Milliseconds());

var allSuppressedDeadLetter = allListener.ExpectMsg<SuppressedDeadLetter>();
var allSuppressedDeadLetter = await allListener.ExpectMsgAsync<SuppressedDeadLetter>();
allSuppressedDeadLetter.Message.Should().BeOfType<SuppressedMessage>();
allSuppressedDeadLetter.Sender.Should().Be(TestActor);
allSuppressedDeadLetter.Recipient.Should().Be(Sys.DeadLetters);

var allDeadLetter = allListener.ExpectMsg<DeadLetter>();
var allDeadLetter = await allListener.ExpectMsgAsync<DeadLetter>();
allDeadLetter.Message.Should().BeOfType<NormalMessage>();
allDeadLetter.Sender.Should().Be(TestActor);
allDeadLetter.Recipient.Should().Be(deadActor);

allListener.ExpectNoMsg(200.Milliseconds());
await allListener.ExpectNoMsgAsync(200.Milliseconds());

// unwrap for ActorSelection
Sys.ActorSelection(deadActor.Path).Tell(new SuppressedMessage());
Sys.ActorSelection(deadActor.Path).Tell(new NormalMessage());

// the recipient ref isn't the same as deadActor here so only checking the message
deadLetter = deadListener.ExpectMsg<DeadLetter>();//
deadLetter = await deadListener.ExpectMsgAsync<DeadLetter>();//
deadLetter.Message.Should().BeOfType<NormalMessage>();
suppressedDeadLetter = suppressedListener.ExpectMsg<SuppressedDeadLetter>();
suppressedDeadLetter = await suppressedListener.ExpectMsgAsync<SuppressedDeadLetter>();
suppressedDeadLetter.Message.Should().BeOfType<SuppressedMessage>();

deadListener.ExpectNoMsg(200.Milliseconds());
suppressedListener.ExpectNoMsg(200.Milliseconds());
await deadListener.ExpectNoMsgAsync(200.Milliseconds());
await suppressedListener.ExpectNoMsgAsync(200.Milliseconds());
}

[Fact]
public void Must_suppress_message_from_default_dead_letters_logging_sent_to_dead_letters()
public async Task Must_suppress_message_from_default_dead_letters_logging_sent_to_dead_letters()
{
var deadListener = CreateTestProbe();
Sys.EventStream.Subscribe(deadListener.Ref, typeof(DeadLetter));
Expand All @@ -112,46 +113,46 @@ public void Must_suppress_message_from_default_dead_letters_logging_sent_to_dead
Sys.DeadLetters.Tell(new SuppressedMessage());
Sys.DeadLetters.Tell(new NormalMessage());

var deadLetter = deadListener.ExpectMsg<DeadLetter>(200.Milliseconds());
var deadLetter = await deadListener.ExpectMsgAsync<DeadLetter>(200.Milliseconds());
deadLetter.Message.Should().BeOfType<NormalMessage>();
deadLetter.Sender.Should().Be(TestActor);
deadLetter.Recipient.Should().Be(Sys.DeadLetters);

var suppressedDeadLetter = suppressedListener.ExpectMsg<SuppressedDeadLetter>(200.Milliseconds());
var suppressedDeadLetter = await suppressedListener.ExpectMsgAsync<SuppressedDeadLetter>(200.Milliseconds());
suppressedDeadLetter.Message.Should().BeOfType<SuppressedMessage>();
suppressedDeadLetter.Sender.Should().Be(TestActor);
suppressedDeadLetter.Recipient.Should().Be(Sys.DeadLetters);

var allSuppressedDeadLetter = allListener.ExpectMsg<SuppressedDeadLetter>(200.Milliseconds());
var allSuppressedDeadLetter = await allListener.ExpectMsgAsync<SuppressedDeadLetter>(200.Milliseconds());
allSuppressedDeadLetter.Message.Should().BeOfType<SuppressedMessage>();
allSuppressedDeadLetter.Sender.Should().Be(TestActor);
allSuppressedDeadLetter.Recipient.Should().Be(Sys.DeadLetters);

var allDeadLetter = allListener.ExpectMsg<DeadLetter>(200.Milliseconds());
var allDeadLetter = await allListener.ExpectMsgAsync<DeadLetter>(200.Milliseconds());
allDeadLetter.Message.Should().BeOfType<NormalMessage>();
allDeadLetter.Sender.Should().Be(TestActor);
allDeadLetter.Recipient.Should().Be(Sys.DeadLetters);

Thread.Sleep(200);
deadListener.ExpectNoMsg(TimeSpan.Zero);
suppressedListener.ExpectNoMsg(TimeSpan.Zero);
allListener.ExpectNoMsg(TimeSpan.Zero);
await Task.Delay(200);
await deadListener.ExpectNoMsgAsync(TimeSpan.Zero);
await suppressedListener.ExpectNoMsgAsync(TimeSpan.Zero);
await allListener.ExpectNoMsgAsync(TimeSpan.Zero);

// unwrap for ActorSelection
Sys.ActorSelection(Sys.DeadLetters.Path).Tell(new SuppressedMessage());
Sys.ActorSelection(Sys.DeadLetters.Path).Tell(new NormalMessage());

deadLetter = deadListener.ExpectMsg<DeadLetter>();
deadLetter = await deadListener.ExpectMsgAsync<DeadLetter>();
deadLetter.Message.Should().BeOfType<NormalMessage>();
deadLetter.Sender.Should().Be(TestActor);
deadLetter.Recipient.Should().Be(Sys.DeadLetters);
suppressedDeadLetter = suppressedListener.ExpectMsg<SuppressedDeadLetter>();
suppressedDeadLetter = await suppressedListener.ExpectMsgAsync<SuppressedDeadLetter>();
suppressedDeadLetter.Message.Should().BeOfType<SuppressedMessage>();
suppressedDeadLetter.Sender.Should().Be(TestActor);
suppressedDeadLetter.Recipient.Should().Be(Sys.DeadLetters);

deadListener.ExpectNoMsg(200.Milliseconds());
suppressedListener.ExpectNoMsg(200.Milliseconds());
await deadListener.ExpectNoMsgAsync(200.Milliseconds());
await suppressedListener.ExpectNoMsgAsync(200.Milliseconds());
}
}
}
29 changes: 15 additions & 14 deletions src/core/Akka.Tests/Actor/DeadLetterSuspensionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
Expand Down Expand Up @@ -80,38 +81,38 @@ private string ExpectedUnhandledLogMessage(int count) =>


[Fact]
public void Must_suspend_dead_letters_logging_when_reaching_akka_log_dead_letters_and_then_re_enable()
public async Task Must_suspend_dead_letters_logging_when_reaching_akka_log_dead_letters_and_then_re_enable()
{
EventFilter
await EventFilter
.Info(start: ExpectedDeadLettersLogMessage(1))
.Expect(1, () => _deadActor.Tell(1));
.ExpectAsync(1, () => _deadActor.Tell(1));

EventFilter
await EventFilter
.Info(start: ExpectedDroppedLogMessage(2))
.Expect(1, () => _droppingActor.Tell(2));
.ExpectAsync(1, () => _droppingActor.Tell(2));

EventFilter
await EventFilter
.Info(start: ExpectedUnhandledLogMessage(3))
.Expect(1, () => _unhandledActor.Tell(3));
.ExpectAsync(1, () => _unhandledActor.Tell(3));

EventFilter
await EventFilter
.Info(start: ExpectedDeadLettersLogMessage(4) + ", no more dead letters will be logged in next")
.Expect(1, () => _deadActor.Tell(4));
.ExpectAsync(1, () => _deadActor.Tell(4));
_deadActor.Tell(5);
_droppingActor.Tell(6);

// let suspend-duration elapse
Thread.Sleep(2050);
await Task.Delay(2050);

// re-enabled
EventFilter
await EventFilter
.Info(start: ExpectedDeadLettersLogMessage(7) + ", of which 2 were not logged")
.Expect(1, () => _deadActor.Tell(7));
.ExpectAsync(1, () => _deadActor.Tell(7));

// reset count
EventFilter
await EventFilter
.Info(start: ExpectedDeadLettersLogMessage(1))
.Expect(1, () => _deadActor.Tell(8));
.ExpectAsync(1, () => _deadActor.Tell(8));
}
}
}