Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 0 additions & 145 deletions src/Akka.Cluster.Hosting/AkkaClusterHealthCheck.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,151 +20,6 @@ public static IReadOnlyDictionary<string, object> DumpClusterState(this ClusterE
}
}

/// <summary>
/// A programmable health check for Akka.Cluster. Make sure you read the documentation carefully.
///
/// You often will not need this - Akka.NET's Split Brain Resolvers (see https://getakka.net/articles/clustering/split-brain-resolver.html)
/// typically handle most scenarios that require intervention automatically; they're enabled by default; and they will automatically
/// kill unhealthy Akka.NET processes in order to ensure that there is a single, unified Akka.NET cluster afterwards.
///
/// This component is useful for scenarios where you want to trigger node-death when your cluster is missing enough
/// nodes to fulfill its duties and can't really function anyway.
/// </summary>
internal sealed class AkkaClusterHealthCheck : IAkkaHealthCheck
{
/// <summary>
/// The default value for <see cref="TriggerHealthCheckFailureThreshold"/>. 20 seconds.
/// </summary>
public static readonly TimeSpan DefaultFailureEvaluationThreshold = TimeSpan.FromSeconds(20);

private bool _weHaveJoined;
private bool _evaluationConditionSatisfied;
private DateTime? _failureInitiallyTriggered;

internal bool PassedJoinCondition => !DontEvaluateUntilWeHaveJoined || _weHaveJoined;
internal bool PassedEvalCondition => _evaluationConditionSatisfied;
internal bool CanEvaluateFailure => PassedEvalCondition && PassedJoinCondition;

internal bool ShouldTriggerFailure(DateTime now) => _failureInitiallyTriggered != null &&
(now - _failureInitiallyTriggered) >=
TriggerHealthCheckFailureThreshold;

/// <summary>
/// Creates a new <see cref="AkkaClusterHealthCheck"/> configuration.
/// </summary>
/// <param name="unhealthyWhenTrue">The predicate function that RETURNS TRUE when the cluster is NOT HEALTHY.</param>
public AkkaClusterHealthCheck(Predicate<ClusterEvent.CurrentClusterState> unhealthyWhenTrue)
: this(unhealthyWhenTrue, DefaultFailureEvaluationThreshold, true, null)
{
}

/// <summary>
/// Creates a new <see cref="AkkaClusterHealthCheck"/> configuration.
/// </summary>
/// <param name="unhealthyWhenTrue">The predicate function that RETURNS TRUE when the cluster is NOT HEALTHY.</param>
/// <param name="triggerHealthCheckFailureThreshold">Failure checking threshold - the <see cref="UnhealthyWhenTrue"/> function needs to be true for this amount of time.</param>
/// <param name="dontEvaluateUntilWeHaveJoined">Don't start evaluating failure conditions until we join the cluster.</param>
/// <param name="dontEvaluateUntil">Optional. Don't start evaluating failure conditions until this pre-condition is met.</param>
public AkkaClusterHealthCheck(Predicate<ClusterEvent.CurrentClusterState> unhealthyWhenTrue,
TimeSpan triggerHealthCheckFailureThreshold, bool dontEvaluateUntilWeHaveJoined,
Predicate<ClusterEvent.CurrentClusterState>? dontEvaluateUntil)
{
// check the range on the triggerHealthCheckFailureThreshold
if (triggerHealthCheckFailureThreshold <= TimeSpan.FromSeconds(1))
throw new ArgumentOutOfRangeException(nameof(triggerHealthCheckFailureThreshold),
"For your own good, you really need a failure threshold value greater than 1s. 10s or 20s would be much better.");

if (triggerHealthCheckFailureThreshold == Timeout.InfiniteTimeSpan)
throw new ArgumentOutOfRangeException(nameof(triggerHealthCheckFailureThreshold),
"Cannot set an infinite value. Pick something reasonable.");

UnhealthyWhenTrue = unhealthyWhenTrue;
TriggerHealthCheckFailureThreshold = triggerHealthCheckFailureThreshold;

DontEvaluateUntilWeHaveJoined = dontEvaluateUntilWeHaveJoined;
DontEvaluateUntilTrue = dontEvaluateUntil ?? (_ => true);
}

/// <summary>
/// When set to <c>true</c>, prevents this check from even being evaluated until after we've successfully
/// joined the cluster (generally, a good idea to leave on.)
/// </summary>
/// <remarks>
/// Defaults to <c>true</c>.
/// </remarks>
public bool DontEvaluateUntilWeHaveJoined { get; set; }

/// <summary>
/// Optional.
///
/// When set, we can't begin evaluating this health check until certain conditions within the cluster are met.
///
/// Minimum number of members, members of a certain role type, etc.
/// </summary>
public Predicate<ClusterEvent.CurrentClusterState> DontEvaluateUntilTrue { get; set; }

/// <summary>
/// Predicate function for determining when the cluster isn't healthy.
/// </summary>
public Predicate<ClusterEvent.CurrentClusterState> UnhealthyWhenTrue { get; }

/// <summary>
/// Immediately triggering a health check failure the instant a problem is detected within
/// an Akka.NET cluster is a truly awful idea - it will render your system's partition tolerance
/// to zero and make the entire system overreact at a massive scale if blips or other small,
/// inevitable, easy-to-overcome network problems occur.
///
/// SET THIS VALUE TO SOMETHING REASONABLE. 20s is the default value. It's a good default. Be
/// very, very careful going any lower than that.
/// </summary>
public TimeSpan TriggerHealthCheckFailureThreshold { get; }

public Task<HealthCheckResult> CheckHealthAsync(AkkaHealthCheckContext context,
CancellationToken cancellationToken = default)
{
var cluster = Cluster.Get(context.ActorSystem);

if (!_weHaveJoined)
{
// satisfy "ARE WE UP?" criteria
var selfMember = cluster.SelfMember;

// only two acceptable transitions for passing this check are Up or WeaklyUp
_weHaveJoined = selfMember.Status is MemberStatus.Up or MemberStatus.WeaklyUp;
}

if (!_evaluationConditionSatisfied)
{
// evaluate any pre-conditions (this only has to happen once)
_evaluationConditionSatisfied = DontEvaluateUntilTrue(cluster.State);
}


if (!CanEvaluateFailure) return Task.FromResult(HealthCheckResult.Healthy());

var areInFailure = UnhealthyWhenTrue(cluster.State);
switch (areInFailure, _failureInitiallyTriggered)
{
case (false, not null):
// we were in failure; now we are not
_failureInitiallyTriggered = null; // clear the status
break;
case (true, null):
// first time entering failure
_failureInitiallyTriggered = DateTime.UtcNow;
break;
case (true, not null) when ShouldTriggerFailure(DateTime.UtcNow):
// time to signal failure publicly, which might have repercussions
return Task.FromResult(new HealthCheckResult(context.Registration.FailureStatus,
$"Cluster has been unhealthy for [{DateTime.UtcNow - _failureInitiallyTriggered:g}]",
data: cluster.State.DumpClusterState()));
}

// don't know enough
return Task.FromResult(HealthCheckResult.Healthy(data: cluster.State.DumpClusterState()));
}
}

/// <summary>
/// Checks to see if we've joined a cluster and have been marked as <see cref="MemberStatus.Up"/>
/// or <see cref="MemberStatus.WeaklyUp"/>
Expand Down
14 changes: 12 additions & 2 deletions src/Akka.Cluster.Hosting/AkkaClusterHostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -696,16 +696,26 @@ public static AkkaConfigurationBuilder WithClustering(
/// which will return `Unhealthy` until we have successfully joined a cluster. Used to prevent nodes
/// from accepting load-balancer traffic until we have access to the cluster.
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="failureStatus">
/// The <see cref="HealthStatus"/> that should be reported upon failure of the health check. If the provided value
/// is <c>null</c>, then <see cref="HealthStatus.Unhealthy"/> will be reported.
/// </param>
/// <param name="tags">A list of tags that can be used for filtering health checks.</param>
/// <remarks>
/// If you need to customize the readiness check, you can use <see cref="AkkaConfigurationBuilder.WithHealthCheck(AkkaHealthCheckRegistration)"/> to
/// register your own <see cref="AkkaHealthCheckRegistration"/> with the <see cref="AkkaClusterReadinessCheck"/>.
/// </remarks>
public static AkkaConfigurationBuilder WithAkkaClusterReadinessCheck(
this AkkaConfigurationBuilder builder)
this AkkaConfigurationBuilder builder,
HealthStatus? failureStatus = null,
IEnumerable<string>? tags = null)
{
string[] defaultTags = ["akka", "ready", "akka.cluster"];

// add the default cluster readiness check
return builder.WithHealthCheck(new AkkaHealthCheckRegistration("akka.cluster.join", new AkkaClusterReadinessCheck(),
HealthStatus.Unhealthy, ["ready", "akka.cluster"]));
failureStatus ?? HealthStatus.Unhealthy, tags ?? defaultTags));
}

public static AkkaConfigurationBuilder WithDistributedData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ namespace Akka.Cluster.Hosting
{
public static class AkkaClusterHostingExtensions
{
public static Akka.Hosting.AkkaConfigurationBuilder WithAkkaClusterReadinessCheck(this Akka.Hosting.AkkaConfigurationBuilder builder) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithAkkaClusterReadinessCheck(this Akka.Hosting.AkkaConfigurationBuilder builder, Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus? failureStatus = default, System.Collections.Generic.IEnumerable<string>? tags = null) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithClusterClient<TKey>(this Akka.Hosting.AkkaConfigurationBuilder builder, System.Collections.Generic.IEnumerable<string> initialContacts) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithClusterClient<TKey>(this Akka.Hosting.AkkaConfigurationBuilder builder, System.Collections.Generic.IList<Akka.Actor.ActorPath> initialContacts) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithClusterClient<TKey>(this Akka.Hosting.AkkaConfigurationBuilder builder, System.Collections.Generic.IEnumerable<Akka.Actor.Address> initialContactAddresses, string receptionistActorName = "receptionist") { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ namespace Akka.Hosting
public static Akka.Hosting.AkkaConfigurationBuilder AddSetup(this Akka.Hosting.AkkaConfigurationBuilder builder, Akka.Actor.Setup.Setup setup) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithActorAskTimeout(this Akka.Hosting.AkkaConfigurationBuilder builder, System.TimeSpan timeout) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithActorRefProvider(this Akka.Hosting.AkkaConfigurationBuilder builder, Akka.Actor.ProviderSelection providerSelection) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithActorSystemLivenessCheck(this Akka.Hosting.AkkaConfigurationBuilder builder) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithActorSystemLivenessCheck(this Akka.Hosting.AkkaConfigurationBuilder builder, Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus? failureStatus = default, System.Collections.Generic.IEnumerable<string>? tags = null) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithActors(this Akka.Hosting.AkkaConfigurationBuilder builder, Akka.Hosting.ActorStarter actorStarter) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithActors(this Akka.Hosting.AkkaConfigurationBuilder builder, Akka.Hosting.ActorStarterWithResolver actorStarter) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithActors(this Akka.Hosting.AkkaConfigurationBuilder builder, System.Action<Akka.Actor.ActorSystem, Akka.Hosting.IActorRegistry> actorStarter) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithActors(this Akka.Hosting.AkkaConfigurationBuilder builder, System.Action<Akka.Actor.ActorSystem, Akka.Hosting.IActorRegistry, Akka.DependencyInjection.IDependencyResolver> actorStarter) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithHealthCheck(this Akka.Hosting.AkkaConfigurationBuilder builder, string name, Akka.Hosting.IAkkaHealthCheck healthCheck) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithHealthCheck(this Akka.Hosting.AkkaConfigurationBuilder builder, string name, System.Func<Akka.Actor.ActorSystem, Akka.Hosting.ActorRegistry, System.Threading.CancellationToken, System.Threading.Tasks.Task<Microsoft.Extensions.Diagnostics.HealthChecks.HealthCheckResult>> healthCheck) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithHealthCheck(this Akka.Hosting.AkkaConfigurationBuilder builder, string name, Akka.Hosting.IAkkaHealthCheck healthCheck, Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus? failureStatus = default, System.Collections.Generic.IEnumerable<string>? tags = null, System.TimeSpan? timeout = default) { }
public static Akka.Hosting.AkkaConfigurationBuilder WithHealthCheck(this Akka.Hosting.AkkaConfigurationBuilder builder, string name, System.Func<Akka.Actor.ActorSystem, Akka.Hosting.ActorRegistry, System.Threading.CancellationToken, System.Threading.Tasks.Task<Microsoft.Extensions.Diagnostics.HealthChecks.HealthCheckResult>> healthCheck, Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus? failureStatus = default, System.Collections.Generic.IEnumerable<string>? tags = null, System.TimeSpan? timeout = default) { }
}
public class DeadLetterOptions
{
Expand Down
16 changes: 14 additions & 2 deletions src/Akka.Hosting.Tests/HealthChecks/HealthChecksSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected override void OnReceive(object message)
protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
{
builder
.WithActorSystemLivenessCheck() // have to opt-in to the built-in health check
.WithActorSystemLivenessCheck(failureStatus: HealthStatus.Degraded, tags: ["custom", "liveness"]) // test custom parameters
.WithHealthCheck("FooActor alive", async (system, registry, cancellationToken) =>
{
/*
Expand Down Expand Up @@ -61,7 +61,7 @@ protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IService
}

return HealthCheckResult.Healthy("fooActor found and responsive");
});
}, failureStatus: HealthStatus.Degraded, tags: ["foo", "actor"], timeout: TimeSpan.FromSeconds(30));
}

[Fact]
Expand All @@ -78,6 +78,12 @@ public async Task ShouldHaveDefaultHealthCheckRegistration()
// find the built-in implementation
var actorSystemHealthCheckRegistration =
configurationBuilder.HealthChecks.Values.Single(c => c.Factory(Host.Services) is ActorSystemLivenessCheck);

// Verify the custom parameters we set in ConfigureAkka were applied
Assert.Equal(HealthStatus.Degraded, actorSystemHealthCheckRegistration.FailureStatus);
Assert.Contains("custom", actorSystemHealthCheckRegistration.Tags);
Assert.Contains("liveness", actorSystemHealthCheckRegistration.Tags);

var akkaHealthCheckContext = new AkkaHealthCheckContext(Sys)
{ Registration = actorSystemHealthCheckRegistration.ToHealthCheckRegistration() };

Expand All @@ -99,6 +105,12 @@ public async Task ShouldReturnAppropriateResults()
var customActorHealthCheck =
configurationBuilder.HealthChecks.Values.Single(c => c.Factory(Host.Services) is DelegateHealthCheck);

// Verify custom parameters for the delegate health check
Assert.Equal(HealthStatus.Degraded, customActorHealthCheck.FailureStatus);
Assert.Contains("foo", customActorHealthCheck.Tags);
Assert.Contains("actor", customActorHealthCheck.Tags);
Assert.Equal(TimeSpan.FromSeconds(30), customActorHealthCheck.Timeout);

var akkaHealthCheckContext = new AkkaHealthCheckContext(Sys)
{ Registration = customActorHealthCheck.ToHealthCheckRegistration() };

Expand Down
Loading