Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions src/core/Akka.Remote.Tests/ActorsLeakSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Configuration;
using Akka.Remote.Transport;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.TestKit.TestActors;
using Akka.Util.Internal;
using Xunit;
using FluentAssertions;
using FluentAssertions.Extensions;
Expand All @@ -25,7 +26,7 @@ namespace Akka.Remote.Tests
{
public class ActorsLeakSpec : AkkaSpec
{
public static readonly Config Confg = ConfigurationFactory.ParseString(@"
private static readonly Config Config = ConfigurationFactory.ParseString(@"
akka.actor.provider = remote
akka.loglevel = INFO
akka.remote.dot-netty.tcp.applied-adapters = [trttl]
Expand All @@ -37,7 +38,7 @@ public class ActorsLeakSpec : AkkaSpec
akka.test.filter-leeway = 12 s
");

public ActorsLeakSpec(ITestOutputHelper output) : base(Confg, output)
public ActorsLeakSpec(ITestOutputHelper output) : base(Config, output)
{
}

Expand Down Expand Up @@ -87,16 +88,17 @@ private void AssertActors(ImmutableHashSet<IActorRef> expected, ImmutableHashSet
}

[Fact]
public void Remoting_must_not_leak_actors()
public async Task Remoting_must_not_leak_actors()
{
var actorRef = Sys.ActorOf(EchoActor.Props(this, true), "echo");
var echoPath = new RootActorPath(RARP.For(Sys).Provider.DefaultAddress)/"user"/"echo";

var targets = new[] {"/system/endpointManager", "/system/transports"}.Select(x =>
{
Sys.ActorSelection(x).Tell(new Identify(0));
return ExpectMsg<ActorIdentity>().Subject;
}).ToList();
var targets = await Task.WhenAll(new[] { "/system/endpointManager", "/system/transports" }.Select(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

async x =>
{
Sys.ActorSelection(x).Tell(new Identify(0));
return (await ExpectMsgAsync<ActorIdentity>()).Subject;
}));

var initialActors = targets.SelectMany(CollectLiveActors).ToImmutableHashSet();

Expand All @@ -111,14 +113,14 @@ public void Remoting_must_not_leak_actors()
{
var probe = CreateTestProbe(remoteSystem);
remoteSystem.ActorSelection(echoPath).Tell(new Identify(1), probe.Ref);
probe.ExpectMsg<ActorIdentity>().Subject.ShouldNotBe(null);
(await probe.ExpectMsgAsync<ActorIdentity>()).Subject.ShouldNotBe(null);
}
finally
{
remoteSystem.Terminate();
await ShutdownAsync(remoteSystem);
}

remoteSystem.WhenTerminated.Wait(TimeSpan.FromSeconds(10)).ShouldBeTrue();
Assert.True(await remoteSystem.WhenTerminated.AwaitWithTimeout(TimeSpan.FromSeconds(10)));
}

// Quarantine an old incarnation case
Expand All @@ -137,7 +139,7 @@ public void Remoting_must_not_leak_actors()
// the message from remote to local will cause inbound connection established
var probe = CreateTestProbe(remoteSystem);
remoteSystem.ActorSelection(echoPath).Tell(new Identify(1), probe.Ref);
probe.ExpectMsg<ActorIdentity>().Subject.ShouldNotBe(null);
(await probe.ExpectMsgAsync<ActorIdentity>()).Subject.ShouldNotBe(null);

var beforeQuarantineActors = targets.SelectMany(CollectLiveActors).ToImmutableHashSet();

Expand All @@ -147,19 +149,19 @@ public void Remoting_must_not_leak_actors()

// the message from local to remote should reuse passive inbound connection
Sys.ActorSelection(new RootActorPath(remoteAddress) / "user" / "stoppable").Tell(new Identify(1));
ExpectMsg<ActorIdentity>().Subject.ShouldNotBe(null);
(await ExpectMsgAsync<ActorIdentity>()).Subject.ShouldNotBe(null);

AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
var afterQuarantineActors = targets.SelectMany(CollectLiveActors).ToImmutableHashSet();
AssertActors(beforeQuarantineActors, afterQuarantineActors);
}, TimeSpan.FromSeconds(10));
}
finally
{
remoteSystem.Terminate();
await ShutdownAsync(remoteSystem);
}
remoteSystem.WhenTerminated.Wait(TimeSpan.FromSeconds(10)).ShouldBeTrue();
Assert.True(await remoteSystem.WhenTerminated.AwaitWithTimeout(TimeSpan.FromSeconds(10)));
}

// Missing SHUTDOWN case
Expand All @@ -174,20 +176,20 @@ public void Remoting_must_not_leak_actors()
{
var probe = CreateTestProbe(remoteSystem);
remoteSystem.ActorSelection(echoPath).Tell(new Identify(1), probe.Ref);
probe.ExpectMsg<ActorIdentity>().Subject.ShouldNotBe(null);
(await probe.ExpectMsgAsync<ActorIdentity>()).Subject.ShouldNotBe(null);

// This will make sure that no SHUTDOWN message gets through
RARP.For(Sys).Provider.Transport.ManagementCommand(new ForceDisassociate(remoteAddress))
.Wait(TimeSpan.FromSeconds(3)).ShouldBeTrue();
Assert.True(await RARP.For(Sys).Provider.Transport.ManagementCommand(new ForceDisassociate(remoteAddress))
.AwaitWithTimeout(TimeSpan.FromSeconds(3)));
}
finally
{
remoteSystem.Terminate();
await ShutdownAsync(remoteSystem);
}

EventFilter.Warning(contains: "Association with remote system").ExpectOne(() =>
await EventFilter.Warning(contains: "Association with remote system").ExpectOneAsync(async () =>
{
remoteSystem.WhenTerminated.Wait(TimeSpan.FromSeconds(10)).ShouldBeTrue();
Assert.True(await remoteSystem.WhenTerminated.AwaitWithTimeout(TimeSpan.FromSeconds(10)));
});
}

Expand All @@ -204,37 +206,37 @@ public void Remoting_must_not_leak_actors()
var probe = CreateTestProbe(idleRemoteSystem);

idleRemoteSystem.ActorSelection(echoPath).Tell(new Identify(1), probe.Ref);
probe.ExpectMsg<ActorIdentity>().Subject.ShouldNotBe(null);
(await probe.ExpectMsgAsync<ActorIdentity>()).Subject.ShouldNotBe(null);

// Watch a remote actor - this results in system message traffic
Sys.ActorSelection(new RootActorPath(idleRemoteAddress) / "user" / "stoppable").Tell(new Identify(1));
var remoteActor = ExpectMsg<ActorIdentity>().Subject;
var remoteActor = (await ExpectMsgAsync<ActorIdentity>()).Subject;
Watch(remoteActor);
remoteActor.Tell("stop");
ExpectTerminated(remoteActor);
await ExpectTerminatedAsync(remoteActor);
// All system messages have been acked now on this side

// This will make sure that no SHUTDOWN message gets through
RARP.For(Sys).Provider.Transport.ManagementCommand(new ForceDisassociate(idleRemoteAddress))
.Wait(TimeSpan.FromSeconds(3)).ShouldBeTrue();
Assert.True(await RARP.For(Sys).Provider.Transport.ManagementCommand(new ForceDisassociate(idleRemoteAddress))
.AwaitWithTimeout(TimeSpan.FromSeconds(3)));
}
finally
{
idleRemoteSystem.Terminate();
await ShutdownAsync(idleRemoteSystem);
}

EventFilter.Warning(contains: "Association with remote system").ExpectOne(() =>
await EventFilter.Warning(contains: "Association with remote system").ExpectOneAsync(async () =>
{
idleRemoteSystem.WhenTerminated.Wait(TimeSpan.FromSeconds(10)).ShouldBeTrue();
Assert.True(await idleRemoteSystem.WhenTerminated.AwaitWithTimeout(TimeSpan.FromSeconds(10)));
});

/*
* Wait for the ReliableDeliverySupervisor to receive its "TooLongIdle" message,
* which will throw a HopelessAssociation wrapped around a TimeoutException.
*/
EventFilter.Exception<TimeoutException>().ExpectOne(() => { });
await EventFilter.Exception<TimeoutException>().ExpectOneAsync(() => { });

AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
AssertActors(initialActors, targets.SelectMany(CollectLiveActors).ToImmutableHashSet());
}, 10.Seconds());
Expand Down