Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/core/Akka/Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private static ActorSystem CreateAndStartSystem(string name, Config withFallback
/// </returns>
public abstract Task Terminate();

internal abstract void FinalTerminate();
internal abstract Task FinalTerminate();

/// <summary>
/// Returns a task which will be completed after the <see cref="ActorSystem"/> has been
Expand Down
21 changes: 14 additions & 7 deletions src/core/Akka/Actor/CoordinatedShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private ClusterJoinUnsuccessfulReason() { }
/// <summary>
/// The <see cref="ActorSystem"/>
/// </summary>
public ExtendedActorSystem System { get; }
public ExtendedActorSystem System { get; private set; }

/// <summary>
/// The set of named <see cref="Phase"/>s that will be executed during coordinated shutdown.
Expand All @@ -261,7 +261,7 @@ private ClusterJoinUnsuccessfulReason() { }
/// <summary>
/// INTERNAL API
/// </summary>
internal ILoggingAdapter Log { get; }
internal ILoggingAdapter Log { get; private set; }

private readonly HashSet<string> _knownPhases;

Expand All @@ -270,7 +270,7 @@ private ClusterJoinUnsuccessfulReason() { }
/// </summary>
internal readonly List<string> OrderedPhases;

private readonly ConcurrentBag<Func<Task<Done>>> _clrShutdownTasks = new ConcurrentBag<Func<Task<Done>>>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real cause of the memory leak. ConcurrentBag uses ThreadLocal that pinned the lambdas in memory, preventing them from being GC-ed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yikes

private readonly ConcurrentSet<Func<Task<Done>>> _clrShutdownTasks = new ConcurrentSet<Func<Task<Done>>>();
private readonly ConcurrentDictionary<string, ImmutableList<(string, Func<Task<Done>>)>> _tasks = new ConcurrentDictionary<string, ImmutableList<(string, Func<Task<Done>>)>>();
private readonly AtomicReference<Reason> _runStarted = new AtomicReference<Reason>(null);
private readonly AtomicBoolean _clrHooksStarted = new AtomicBoolean(false);
Expand Down Expand Up @@ -335,7 +335,7 @@ internal void AddClrShutdownHook(Func<Task<Done>> hook)
{
if (!_clrHooksStarted)
{
_clrShutdownTasks.Add(hook);
_clrShutdownTasks.TryAdd(hook);
}
}

Expand Down Expand Up @@ -653,13 +653,16 @@ internal static void InitPhaseActorSystemTerminate(ActorSystem system, Config co

if (terminateActorSystem)
{
system.FinalTerminate();
return system.Terminate().ContinueWith(tr =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This potentially an infinite loop call, if the call came from ActorSystem.Terminate() in the first place.

return system.FinalTerminate().ContinueWith(tr =>
{
if (exitClr && !coord._runningClrHook)
{
Environment.Exit(0);
}

coord.System = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clear the reference to the ActorSystem to prevent a circular reference, preventing the ActorSystem from being garbage collected afterward.

coord.Log = null;
coord._tasks.Clear(); // Clear the dictionary, just in case it is retained in memory
return Done.Instance;
});
}
Expand Down Expand Up @@ -691,7 +694,11 @@ internal static void InitClrHook(ActorSystem system, Config conf, CoordinatedShu
var exitTask = TerminateOnClrExit(coord);
// run all hooks during termination sequence
AppDomain.CurrentDomain.ProcessExit += exitTask;
system.WhenTerminated.ContinueWith(tr => { AppDomain.CurrentDomain.ProcessExit -= exitTask; });
system.WhenTerminated.ContinueWith(tr =>
{
AppDomain.CurrentDomain.ProcessExit -= exitTask;
coord._clrShutdownTasks.Clear(); // Clear the tasks, just in case it is retained in memory
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that ConcurrentSet does not use ThreadLocal, but lets add this just in case that changes in the future.

});

coord.AddClrShutdownHook(() =>
{
Expand Down
11 changes: 5 additions & 6 deletions src/core/Akka/Actor/Internal/ActorSystemImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -525,20 +525,19 @@ public override Task Terminate()
{
if(Settings.CoordinatedShutdownRunByActorSystemTerminate)
{
CoordinatedShutdown.Get(this).Run(CoordinatedShutdown.ActorSystemTerminateReason.Instance);
} else
{
FinalTerminate();
return CoordinatedShutdown.Get(this)
.Run(CoordinatedShutdown.ActorSystemTerminateReason.Instance);
}
return WhenTerminated;
return FinalTerminate();
}

internal override void FinalTerminate()
internal override Task FinalTerminate()
{
Log.Debug("System shutdown initiated");
if (!Settings.LogDeadLettersDuringShutdown && _logDeadLetterListener != null)
Stop(_logDeadLetterListener);
_provider.Guardian.Stop();
return WhenTerminated;
}

/// <summary>
Expand Down