Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,32 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

internal class SiloMetadataCache(
ISiloMetadataClient siloMetadataClient,
MembershipTableManager membershipTableManager,
IOptions<ClusterMembershipOptions> clusterMembershipOptions,
ILogger<SiloMetadataCache> logger)
: ISiloMetadataCache, ILifecycleParticipant<ISiloLifecycle>, IDisposable
{
private readonly ConcurrentDictionary<SiloAddress, SiloMetadata> _metadata = new();
private readonly Dictionary<SiloAddress, DateTime> _negativeCache = new();
private readonly CancellationTokenSource _cts = new();
private TimeSpan negativeCachePeriod;

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
Task? task = null;
Task OnStart(CancellationToken _)
{
// This gives time for the cluster to be voted Dead and for membership updates to propagate that out
negativeCachePeriod = clusterMembershipOptions.Value.ProbeTimeout * clusterMembershipOptions.Value.NumMissedProbesLimit
+ (2 * clusterMembershipOptions.Value.TableRefreshTimeout);
task = Task.Run(() => this.ProcessMembershipUpdates(_cts.Token));
return Task.CompletedTask;
}
Expand Down Expand Up @@ -51,26 +59,39 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
await foreach (var update in membershipTableManager.MembershipTableUpdates.WithCancellation(ct))
{
// Add entries for members that aren't already in the cache
foreach (var membershipEntry in update.Entries.Where(e => e.Value.Status is SiloStatus.Active or SiloStatus.Joining))
var now = DateTime.UtcNow;
var recentlyActiveSilos = update.Entries
.Where(e => e.Value.Status is SiloStatus.Active or SiloStatus.Joining)
.Where(e => !e.Value.HasMissedIAmAlives(clusterMembershipOptions.Value, now));
foreach (var membershipEntry in recentlyActiveSilos)
{
if (!_metadata.ContainsKey(membershipEntry.Key))
{
if (_negativeCache.TryGetValue(membershipEntry.Key, out var expiration) && expiration > now)
{
continue;
}
try
{
var metadata = await siloMetadataClient.GetSiloMetadata(membershipEntry.Key).WaitAsync(ct);
_metadata.TryAdd(membershipEntry.Key, metadata);
_negativeCache.Remove(membershipEntry.Key, out _);
}
catch(Exception exception)
{
_negativeCache.TryAdd(membershipEntry.Key, now + negativeCachePeriod);
logger.LogError(exception, "Error fetching metadata for silo {Silo}", membershipEntry.Key);
}
}
}

// Remove entries for members that are now dead
foreach (var membershipEntry in update.Entries.Where(e => e.Value.Status == SiloStatus.Dead))
var deadSilos = update.Entries
.Where(e => e.Value.Status == SiloStatus.Dead);
foreach (var membershipEntry in deadSilos)
{
_metadata.TryRemove(membershipEntry.Key, out _);
_negativeCache.Remove(membershipEntry.Key, out _);
}

// Remove entries for members that are no longer in the table
Expand All @@ -79,6 +100,7 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
if (!update.Entries.ContainsKey(silo))
{
_metadata.TryRemove(silo, out _);
_negativeCache.Remove(silo, out _);
}
}
}
Expand All @@ -102,4 +124,5 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
public void SetMetadata(SiloAddress siloAddress, SiloMetadata metadata) => _metadata.TryAdd(siloAddress, metadata);

public void Dispose() => _cts.Cancel();
}
}