Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/Microsoft.Azure.SignalR.Common/Endpoints/HubServiceEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ namespace Microsoft.Azure.SignalR
{
internal class HubServiceEndpoint : ServiceEndpoint
{
private readonly TaskCompletionSource<bool> _scaleTcs;
private readonly ServiceEndpoint _endpoint;
private readonly long _uniqueIndex;
private static long s_currentIndex;
private static bool _pendingReload;
private TaskCompletionSource<bool> _scaleTcs;

public HubServiceEndpoint(
string hub,
Expand All @@ -22,7 +23,8 @@ ServiceEndpoint endpoint
Hub = hub;
Provider = provider;
_endpoint = endpoint;
_scaleTcs = endpoint.IsStagingScale ? new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously) : null;
_pendingReload = endpoint.PendingReload;
_scaleTcs = endpoint.PendingReload ? new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously) : null;
_uniqueIndex = Interlocked.Increment(ref s_currentIndex);
}

Expand All @@ -41,14 +43,24 @@ ServiceEndpoint endpoint

public void CompleteScale()
{
_pendingReload = false;
_scaleTcs?.TrySetResult(true);
}

// When remove an existing HubServiceEndpoint.
public void ResetScale()
{
_pendingReload = true;
_scaleTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public long UniqueIndex => _uniqueIndex;

public override string ToString()
{
return base.ToString() + $"(hub={Hub})";
}

internal override bool PendingReload => _pendingReload;
}
}
27 changes: 2 additions & 25 deletions src/Microsoft.Azure.SignalR.Common/Endpoints/ServiceEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public Uri ClientEndpoint

internal AccessKey AccessKey { get; private set; }

// Flag to indicate a updaing endpoint need staging
internal bool IsStagingScale { get; set; } = false;
// Flag to indicate an updaing endpoint needs staging
internal virtual bool PendingReload { get; set; } = false;

/// <summary>
/// Connection string constructor with nameWithEndpointType
Expand Down Expand Up @@ -172,29 +172,6 @@ public ServiceEndpoint(ServiceEndpoint other)
}
}

/// <summary>
/// Constructor to create endpoint instance with instant update properties.
/// </summary>
/// <param name="original"></param>
/// <param name="name"></param>
/// <param name="clientEndpoint"></param>
internal ServiceEndpoint(ServiceEndpoint original, string name, Uri clientEndpoint)
{
if (original != null)
{
ConnectionString = original.ConnectionString;
EndpointType = original.EndpointType;
Name = name ?? original.Name;
Version = original.Version;
AccessKey = original.AccessKey;
Endpoint = original.Endpoint;
ClientEndpoint = clientEndpoint ?? original.ClientEndpoint;
ServerEndpoint = original.ServerEndpoint;
AudienceBaseUrl = original.AudienceBaseUrl;
_serviceEndpoint = original._serviceEndpoint;
}
}

public override string ToString()
{
var prefix = string.IsNullOrEmpty(Name) ? string.Empty : $"[{Name}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -144,30 +145,45 @@ private async Task RemoveServiceEndpointsAsync(IReadOnlyList<ServiceEndpoint> en

private void AddEndpointsToNegotiationStore(Dictionary<string, IReadOnlyList<HubServiceEndpoint>> endpoints)
{
foreach (var hub in _endpointsPerHub.Keys)
foreach (var hubEndpoints in _endpointsPerHub)
{
if (!endpoints.TryGetValue(hub, out var updatedEndpoints)
if (!endpoints.TryGetValue(hubEndpoints.Key, out var updatedEndpoints)
|| updatedEndpoints.Count == 0)
{
continue;
}
var oldEndpoints = _endpointsPerHub[hub];
var oldEndpoints = _endpointsPerHub[hubEndpoints.Key];
var newEndpoints = oldEndpoints.ToList();
newEndpoints.AddRange(updatedEndpoints);
_endpointsPerHub.TryUpdate(hub, newEndpoints, oldEndpoints);
_endpointsPerHub.TryUpdate(hubEndpoints.Key, newEndpoints, oldEndpoints);
}
}

private IReadOnlyList<HubServiceEndpoint> UpdateAndGetRemovedHubServiceEndpoints(IEnumerable<ServiceEndpoint> endpoints)
{
var removedEndpoints = new List<HubServiceEndpoint>();
foreach (var hub in _endpointsPerHub.Keys)
foreach (var hubEndpoints in _endpointsPerHub)
{
var oldEndpoints = _endpointsPerHub[hub];
var updatedEndpoints = CreateHubServiceEndpoints(hub, endpoints);
removedEndpoints.AddRange(updatedEndpoints);
var newEndpoints = oldEndpoints.Except(updatedEndpoints).ToList();
_endpointsPerHub.TryUpdate(hub, newEndpoints, oldEndpoints);
var remainedEndpoints = new List<HubServiceEndpoint>();
var oldEndpoints = _endpointsPerHub[hubEndpoints.Key];
foreach (var endpoint in oldEndpoints)
{
var remove = endpoints.FirstOrDefault(e => e.Equals(endpoint));
if (remove != null)
{
// Refer to reload detector to reset scale task.
if (remove.PendingReload)
{
endpoint.ResetScale();
}
removedEndpoints.Add(endpoint);
}
else
{
remainedEndpoints.Add(endpoint);
}
}
_endpointsPerHub.TryUpdate(hubEndpoints.Key, remainedEndpoints, oldEndpoints);
}
return removedEndpoints;
}
Expand Down Expand Up @@ -196,9 +212,9 @@ private IReadOnlyList<HubServiceEndpoint> CreateHubServiceEndpoints(string hub,
private Dictionary<string, IReadOnlyList<HubServiceEndpoint>> CreateHubServiceEndpoints(IEnumerable<ServiceEndpoint> endpoints)
{
var hubEndpoints = new Dictionary<string, IReadOnlyList<HubServiceEndpoint>>();
foreach (var hub in _endpointsPerHub.Keys)
foreach (var item in _endpointsPerHub)
{
hubEndpoints.Add(hub, CreateHubServiceEndpoints(hub, endpoints));
hubEndpoints.Add(item.Key, CreateHubServiceEndpoints(item.Key, endpoints));
}
return hubEndpoints;
}
Expand Down Expand Up @@ -249,10 +265,10 @@ private void UpdateEndpoints(Dictionary<ServiceEndpoint, ServiceEndpoint> update
// Get staging required endpoints
var removed = Endpoints.Keys.Except(updatedEndpoints.Keys, new ServiceEndpointWeakComparer()).ToList();
var added = updatedEndpoints.Keys.Except(Endpoints.Keys, new ServiceEndpointWeakComparer()).ToList();
removed.ForEach(e => e.IsStagingScale = true);
removed.ForEach(e => e.PendingReload = true);
foreach (var item in added)
{
item.IsStagingScale = true;
item.PendingReload = true;
endpoints.Add(item, item);
}

Expand All @@ -262,13 +278,12 @@ private void UpdateEndpoints(Dictionary<ServiceEndpoint, ServiceEndpoint> update
.Except(Endpoints.Keys);
foreach (var endpoint in commonEndpoints)
{
// search exist from old
// search exist from old to remove
var exist = Endpoints.First(x => x.Key.Endpoint == endpoint.Endpoint);
removed.Add(exist.Key);

var updated = new ServiceEndpoint(exist.Key, endpoint.Name, endpoint.ClientEndpoint);
added.Add(updated);
endpoints.Add(updated, updated);
added.Add(endpoint);
endpoints.Add(endpoint, endpoint);
}
removedEndpoints = removed;
addedEndpoints = added;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,17 @@ private void OnAdd(HubServiceEndpoint endpoint)
{
return;
}
if (endpoint.ScaleTask.IsCompleted)
{
UpdateEndpointsStore(endpoint, ScaleOperation.Add);
}
else
{
_ = AddHubServiceEndpointAsync(endpoint);
}
_ = AddHubServiceEndpointAsync(endpoint);
}

private async Task AddHubServiceEndpointAsync(HubServiceEndpoint endpoint)
{
if (!endpoint.PendingReload)
{
UpdateEndpointsStore(endpoint, ScaleOperation.Add);
return;
}

var container = _generator(endpoint);
endpoint.ConnectionContainer = container;

Expand Down Expand Up @@ -292,18 +291,16 @@ private void OnRemove(HubServiceEndpoint endpoint)
{
return;
}
if (endpoint.ScaleTask.IsCompleted)
{
UpdateEndpointsStore(endpoint, ScaleOperation.Remove);
}
else
{
_ = RemoveHubServiceEndpointAsync(endpoint);
}
_ = RemoveHubServiceEndpointAsync(endpoint);
}

private async Task RemoveHubServiceEndpointAsync(HubServiceEndpoint endpoint)
{
if (!endpoint.PendingReload)
{
UpdateEndpointsStore(endpoint, ScaleOperation.Remove);
return;
}
try
{
var container = _routerEndpoints.endpoints.FirstOrDefault(e => e.Endpoint == endpoint.Endpoint && e.EndpointType == endpoint.EndpointType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ public async Task TestMultipleEndpointWithRenamesAndWriteAckableMessage()
new ServiceEndpoint(ConnectionString2, EndpointType.Primary, "2"),
new ServiceEndpoint(ConnectionString3, EndpointType.Secondary, "33")
};
await sem.TestReloadServiceEndpoints(renamedEndpoint);
await sem.TestReloadServiceEndpoints(renamedEndpoint, 10);

// validate container level updates
containerEps = container.GetOnlineEndpoints().OrderBy(x => x.Name).ToArray();
Expand Down Expand Up @@ -924,7 +924,7 @@ public async Task TestMultipleEndpointWithClientEndpointUpdates()
new ServiceEndpoint(ConnectionString2, EndpointType.Primary, "2"),
new ServiceEndpoint(connstr3, EndpointType.Secondary, "3")
};
await sem.TestReloadServiceEndpoints(updateEndpoints);
await sem.TestReloadServiceEndpoints(updateEndpoints, 10);

// validate container level updates
containerEps = container.GetOnlineEndpoints().OrderBy(x => x.Name).ToArray();
Expand Down