Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions src/core/Akka.Remote.Tests/RemoteDeathWatchSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ protected override async Task AfterAllAsync()
}

[Fact]
public void Must_receive_Terminated_when_system_of_deserialized_ActorRef_is_not_running()
public async Task Must_receive_Terminated_when_system_of_deserialized_ActorRef_is_not_running()
{
var probe = CreateTestProbe();
Sys.EventStream.Subscribe(probe.Ref, typeof(QuarantinedEvent));
var rarp = RARP.For(Sys).Provider;
// pick an unused port (not going to use a socket address generator here; just a port not used by either actor system)
int port = rarp.DefaultAddress.Port.Value;
while (port == rarp.DefaultAddress.Port.Value || port == 2666)
var port = rarp.DefaultAddress.Port;
while (port == rarp.DefaultAddress.Port || port == 2666)
port = ThreadLocalRandom.Current.Next(1, 65535);
// simulate de-serialized ActorRef
var @ref = rarp.ResolveActorRef($"akka.tcp://OtherSystem@localhost:{port}/user/foo/bar#1752527294");
Expand All @@ -83,17 +83,17 @@ public void Must_receive_Terminated_when_system_of_deserialized_ActorRef_is_not_
};
Sys.ActorOf(Props.Create(() => new Act(act)).WithDeploy(Deploy.Local));

ExpectMsg(@ref, TimeSpan.FromSeconds(20));
await ExpectMsgAsync(@ref, TimeSpan.FromSeconds(20));
// we don't expect real quarantine when the UID is unknown, i.e. QuarantinedEvent is not published
probe.ExpectNoMsg(TimeSpan.FromSeconds(3));
await probe.ExpectNoMsgAsync(TimeSpan.FromSeconds(3));
// The following verifies that re-delivery of Watch message is stopped.
// It was observed as periodic logging of "address is now gated" when the gate was lifted.
Sys.EventStream.Subscribe(probe.Ref, typeof(Warning));
probe.ExpectNoMsg(TimeSpan.FromSeconds(rarp.RemoteSettings.RetryGateClosedFor.TotalSeconds*2));
await probe.ExpectNoMsgAsync(TimeSpan.FromSeconds(rarp.RemoteSettings.RetryGateClosedFor.TotalSeconds*2));
}

[Fact]
public void Must_receive_terminated_when_watched_node_is_unknown_host()
public async Task Must_receive_terminated_when_watched_node_is_unknown_host()
{
var path = new RootActorPath(new Address("akka.tcp", Sys.Name, "unknownhost", 2552)) / "user" / "subject";
var rarp = RARP.For(Sys).Provider;
Expand All @@ -111,27 +111,27 @@ public void Must_receive_terminated_when_watched_node_is_unknown_host()

Sys.ActorOf(Props.Create(() => new Act(act)).WithDeploy(Deploy.Local), "observer2");

ExpectMsg(path, TimeSpan.FromSeconds(60));
await ExpectMsgAsync(path, TimeSpan.FromSeconds(60));
}

[Fact]
public void Must_receive_ActorIdentity_null_when_identified_node_is_unknown_host()
public async Task Must_receive_ActorIdentity_null_when_identified_node_is_unknown_host()
{
var path = new RootActorPath(new Address("akka.tcp", Sys.Name, "unknownhost2", 2552)) / "user" / "subject";
Sys.ActorSelection(path).Tell(new Identify(path));
var identify = ExpectMsg<ActorIdentity>(TimeSpan.FromSeconds(60));
var identify = await ExpectMsgAsync<ActorIdentity>(TimeSpan.FromSeconds(60));
identify.Subject.ShouldBe(null);
identify.MessageId.ShouldBe(path);
}

[Fact]
public void Must_quarantine_systems_after_unsuccessful_system_message_delivery_if_have_not_communicated_before()
public async Task Must_quarantine_systems_after_unsuccessful_system_message_delivery_if_have_not_communicated_before()
{
// Synthesize an ActorRef to a remote system this one has never talked to before.
// This forces ReliableDeliverySupervisor to start with unknown remote system UID.
var rarp = RARP.For(Sys).Provider;
int port = rarp.DefaultAddress.Port.Value;
while (port == rarp.DefaultAddress.Port.Value || port == 2666)
var port = rarp.DefaultAddress.Port;
while (port == rarp.DefaultAddress.Port || port == 2666)
port = ThreadLocalRandom.Current.Next(1, 65535);

var extinctPath = new RootActorPath(new Address("akka.tcp", "extinct-system", "localhost", port)) / "user" / "noone";
Expand All @@ -143,9 +143,9 @@ public void Must_quarantine_systems_after_unsuccessful_system_message_delivery_i
probe.Watch(extinctRef);
probe.Unwatch(extinctRef);

probe.ExpectNoMsg(TimeSpan.FromSeconds(5));
await probe.ExpectNoMsgAsync(TimeSpan.FromSeconds(5));
Sys.EventStream.Subscribe(probe.Ref, typeof(Warning));
probe.ExpectNoMsg(TimeSpan.FromSeconds(rarp.RemoteSettings.RetryGateClosedFor.TotalSeconds * 2));
await probe.ExpectNoMsgAsync(TimeSpan.FromSeconds(rarp.RemoteSettings.RetryGateClosedFor.TotalSeconds * 2));
}
}
}
Expand Down