Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 44 additions & 45 deletions src/core/Akka.Tests/Event/EventStreamSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using System.Linq;
using Akka.Util.Internal;
using Xunit;
using System.Threading.Tasks;

namespace Akka.Tests.Event
{
Expand Down Expand Up @@ -61,25 +62,25 @@ private class CC { }
private class CCATBT : CC, ATT, BTT { }

[Fact]
public void Manage_subscriptions()
public async Task Manage_subscriptions()
{

var bus = new EventStream(true);
bus.StartUnsubscriber(Sys.AsInstanceOf<ActorSystemImpl>());
bus.Subscribe(TestActor, typeof(M));

bus.Publish(new M { Value = 42 });
ExpectMsg(new M { Value = 42 });
await ExpectMsgAsync(new M { Value = 42 });
bus.Unsubscribe(TestActor);
bus.Publish(new M { Value = 43 });
ExpectNoMsg(TimeSpan.FromSeconds(1));
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
}

[Fact]
public void Not_allow_null_as_subscriber()
{
var bus = new EventStream(true);
XAssert.Throws<ArgumentNullException>(() =>
Assert.Throws<ArgumentNullException>(() =>
{
bus.Subscribe(null, typeof(M));
});
Expand All @@ -89,18 +90,18 @@ public void Not_allow_null_as_subscriber()
public void Not_allow_null_as_unsubscriber()
{
var bus = new EventStream(true);
XAssert.Throws<ArgumentNullException>(() =>
Assert.Throws<ArgumentNullException>(() =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're changing XAssert to Assert, you might as well do it to all of it

Copy link
Contributor Author

@eaba eaba Mar 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Sounds good

{
bus.Unsubscribe(null, typeof(M));
});
XAssert.Throws<ArgumentNullException>(() =>
Assert.Throws<ArgumentNullException>(() =>
{
bus.Unsubscribe(null);
});
}

[Fact]
public void Be_able_to_log_unhandled_messages()
public async Task Be_able_to_log_unhandled_messages()
{
using (var system = ActorSystem.Create("EventStreamSpecUnhandled", GetDebugUnhandledMessagesConfig()))
{
Expand All @@ -110,7 +111,7 @@ public void Be_able_to_log_unhandled_messages()

system.EventStream.Publish(msg);

var debugMsg = ExpectMsg<Debug>();
var debugMsg = await ExpectMsgAsync<Debug>();

debugMsg.Message.ToString().StartsWith("Unhandled message from").ShouldBeTrue();
debugMsg.Message.ToString().EndsWith(": 42").ShouldBeTrue();
Expand All @@ -121,7 +122,7 @@ public void Be_able_to_log_unhandled_messages()
/// Reproduction spec for https://github.com/akkadotnet/akka.net/issues/3267
/// </summary>
[Fact]
public void Bugfix3267_able_to_log_unhandled_messages_with_nosender()
public async Task Bugfix3267_able_to_log_unhandled_messages_with_nosender()
{
using (var system = ActorSystem.Create("EventStreamSpecUnhandled", GetDebugUnhandledMessagesConfig()))
{
Expand All @@ -132,15 +133,15 @@ public void Bugfix3267_able_to_log_unhandled_messages_with_nosender()

system.EventStream.Publish(msg);

var debugMsg = ExpectMsg<Debug>();
var debugMsg = await ExpectMsgAsync<Debug>();

debugMsg.Message.ToString().StartsWith("Unhandled message from").ShouldBeTrue();
debugMsg.Message.ToString().EndsWith(": 42").ShouldBeTrue();
}
}

[Fact]
public void Manage_sub_channels_using_classes()
public async Task Manage_sub_channels_using_classes()
{
var a = new A();
var b1 = new B1();
Expand All @@ -150,24 +151,24 @@ public void Manage_sub_channels_using_classes()
bus.Subscribe(TestActor, typeof(B2));
bus.Publish(c);
bus.Publish(b2);
ExpectMsg(b2);
await ExpectMsgAsync(b2);
bus.Subscribe(TestActor, typeof(A));
bus.Publish(c);
ExpectMsg(c);
await ExpectMsgAsync(c);
bus.Publish(b1);
ExpectMsg(b1);
await ExpectMsgAsync(b1);

bus.Unsubscribe(TestActor, typeof(B1));
bus.Publish(c); //should not publish
bus.Publish(b2); //should publish
bus.Publish(a); //should publish
ExpectMsg(b2);
ExpectMsg(a);
ExpectNoMsg(TimeSpan.FromSeconds(1));
await ExpectMsgAsync(b2);
await ExpectMsgAsync(a);
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
}

[Fact(DisplayName = "manage sub-channels using classes and traits (update on subscribe)")]
public void Manage_sub_channels_using_classes_and_interfaces_update_on_subscribe()
public async Task Manage_sub_channels_using_classes_and_interfaces_update_on_subscribe()
{
var es = new EventStream(false);
var tm1 = new CC();
Expand All @@ -183,11 +184,11 @@ public void Manage_sub_channels_using_classes_and_interfaces_update_on_subscribe
es.Subscribe(a4.Ref, typeof(CCATBT)).ShouldBeTrue();
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg((object)tm2);
a2.ExpectMsg((object)tm2);
a3.ExpectMsg((object)tm1);
a3.ExpectMsg((object)tm2);
a4.ExpectMsg((object)tm2);
await a1.ExpectMsgAsync((object)tm2);
await a2.ExpectMsgAsync((object)tm2);
await a3.ExpectMsgAsync((object)tm1);
await a3.ExpectMsgAsync((object)tm2);
await a4.ExpectMsgAsync((object)tm2);
es.Unsubscribe(a1.Ref, typeof(AT)).ShouldBeTrue();
es.Unsubscribe(a2.Ref, typeof(BT)).ShouldBeTrue();
es.Unsubscribe(a3.Ref, typeof(CC)).ShouldBeTrue();
Expand All @@ -196,7 +197,7 @@ public void Manage_sub_channels_using_classes_and_interfaces_update_on_subscribe

//"manage sub-channels using classes and traits (update on unsubscribe)"
[Fact]
public void Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscribe()
public async Task Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscribe()
{
var es = new EventStream(false);
var tm1 = new CC();
Expand All @@ -213,18 +214,18 @@ public void Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscri
es.Unsubscribe(a3.Ref, typeof(CC));
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg((object)tm2);
a2.ExpectMsg((object)tm2);
a3.ExpectNoMsg(TimeSpan.FromSeconds(1));
a4.ExpectMsg((object)tm2);
await a1.ExpectMsgAsync((object)tm2);
await a2.ExpectMsgAsync((object)tm2);
await a3.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
await a4.ExpectMsgAsync((object)tm2);
es.Unsubscribe(a1.Ref, typeof(AT)).ShouldBeTrue();
es.Unsubscribe(a2.Ref, typeof(BT)).ShouldBeTrue();
es.Unsubscribe(a3.Ref, typeof(CC)).ShouldBeFalse();
es.Unsubscribe(a4.Ref, typeof(CCATBT)).ShouldBeTrue();
}

[Fact]
public void Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscribe_all()
public async Task Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscribe_all()
{
var es = new EventStream(false);
var tm1 = new CC();
Expand All @@ -241,10 +242,10 @@ public void Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscri
es.Unsubscribe(a3.Ref).ShouldBeTrue();
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg((object)tm2);
a2.ExpectMsg((object)tm2);
a3.ExpectNoMsg(TimeSpan.FromSeconds(1));
a4.ExpectMsg((object)tm2);
await a1.ExpectMsgAsync((object)tm2);
await a2.ExpectMsgAsync((object)tm2);
await a3.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
await a4.ExpectMsgAsync((object)tm2);
es.Unsubscribe(a1.Ref, typeof(AT)).ShouldBeTrue();
es.Unsubscribe(a2.Ref, typeof(BT)).ShouldBeTrue();
es.Unsubscribe(a3.Ref, typeof(CC)).ShouldBeFalse();
Expand All @@ -262,12 +263,12 @@ public SetTarget(IActorRef @ref)
}

[Fact]
public void Manage_log_levels()
public async Task Manage_log_levels()
{
var bus = new EventStream(false);
bus.StartDefaultLoggers((ActorSystemImpl)Sys);
bus.Publish(new SetTarget(TestActor));
ExpectMsg("OK", TimeSpan.FromSeconds(5));
await ExpectMsgAsync("OK", TimeSpan.FromSeconds(5));

verifyLevel(bus, LogLevel.InfoLevel);
bus.SetLogLevel(LogLevel.WarningLevel);
Expand Down Expand Up @@ -304,28 +305,26 @@ private static string GetDebugUnhandledMessagesConfig()
".Replace("%logger%", typeof(MyLog).AssemblyQualifiedName);
}

public class MyLog : UntypedActor
public class MyLog : ReceiveActor
{
private IActorRef dst = Context.System.DeadLetters;

protected override void OnReceive(object message)
public MyLog()
{
PatternMatch.Match(message)
.With<InitializeLogger>(m =>
Receive<InitializeLogger>(m =>
{
var bus = m.LoggingBus;
bus.Subscribe(this.Self, typeof(SetTarget));
bus.Subscribe(this.Self, typeof(UnhandledMessage));

Sender.Tell(new LoggerInitialized());
})
.With<SetTarget>(m =>
});
Receive<SetTarget>(m =>
{
dst = m.Ref;
dst.Tell("OK");
})
.With<LogEvent>(m => dst.Tell(m))
.With<UnhandledMessage>(m => dst.Tell(m));
});
Receive<LogEvent>(m => dst.Tell(m));
Receive<UnhandledMessage>(m => dst.Tell(m));
}
}

Expand Down