Skip to content

Commit 30d6a43

Browse files
authored
Gracefully offline and close clients when app shutdown (#2086)
* Gracefully offline and close clients when app shutdown * Also fix issue 1777 * Fix flacky test
1 parent f317977 commit 30d6a43

File tree

11 files changed

+391
-206
lines changed

11 files changed

+391
-206
lines changed

src/Microsoft.Azure.SignalR.Common/Constants.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ internal static class Constants
2222
public const string AsrsDefaultScope = "https://signalr.azure.com/.default";
2323

2424

25-
public const int DefaultCloseTimeoutMilliseconds = 30000;
25+
public const int DefaultCloseTimeoutMilliseconds = 10000;
2626

2727
public static class Keys
2828
{

src/Microsoft.Azure.SignalR.Common/Utilities/TaskExtenstions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public static async Task OrCancelAsync(this Task task, CancellationToken token,
1818
{
1919
// make sure the task throws exception if any
2020
await anyTask;
21+
tcs.TrySetCanceled();
2122
}
2223
else
2324
{

src/Microsoft.Azure.SignalR/ClientConnections/ClientConnectionContext.Log.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ private static class Log
2929
private static readonly Action<ILogger, string, Exception> _connectedStarting =
3030
LoggerMessage.Define<string>(LogLevel.Information, new EventId(6, "ConnectedStarting"), "Connection {TransportConnectionId} started.");
3131

32-
private static readonly Action<ILogger, string, Exception> _detectedLongRunningApplicationTask =
33-
LoggerMessage.Define<string>(LogLevel.Warning, new EventId(7, "DetectedLongRunningApplicationTask"), "The connection {TransportConnectionId} has a long running application logic that prevents the connection from complete.");
32+
private static readonly Action<ILogger, string, int, Exception> _detectedLongRunningApplicationTask =
33+
LoggerMessage.Define<string, int>(LogLevel.Warning, new EventId(7, "DetectedLongRunningApplicationTask"), "The connection {TransportConnectionId} has a long running application logic that prevents the connection from complete after {TimeoutMilliseconds} milliseconds.");
3434

3535
private static readonly Action<ILogger, string, Exception> _outgoingTaskPaused =
3636
LoggerMessage.Define<string>(LogLevel.Information, new EventId(8, "OutgoingTaskPaused"), "Outgoing messages for connection {connectionId} have been paused.");
@@ -71,9 +71,9 @@ public static void ConnectedStarting(ILogger logger, string connectionId)
7171
_connectedStarting(logger, connectionId, null);
7272
}
7373

74-
public static void DetectedLongRunningApplicationTask(ILogger logger, string connectionId)
74+
public static void DetectedLongRunningApplicationTask(ILogger logger, string connectionId, int timeoutMilliseconds)
7575
{
76-
_detectedLongRunningApplicationTask(logger, connectionId, null);
76+
_detectedLongRunningApplicationTask(logger, connectionId, timeoutMilliseconds, null);
7777
}
7878

7979
public static void OutgoingTaskPaused(ILogger logger, string connectionId)

src/Microsoft.Azure.SignalR/ClientConnections/ClientConnectionContext.cs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ internal partial class ClientConnectionContext : ConnectionContext,
7575

7676
private readonly CancellationTokenSource _abortOutgoingCts = new CancellationTokenSource();
7777

78+
private readonly CancellationTokenSource _connectionClosedCts = new CancellationTokenSource();
79+
7880
private readonly object _heartbeatLock = new object();
7981

8082
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
@@ -97,6 +99,10 @@ internal partial class ClientConnectionContext : ConnectionContext,
9799

98100
private long _receivedBytes;
99101

102+
#if !NETSTANDARD
103+
public override CancellationToken ConnectionClosed => _connectionClosedCts.Token;
104+
#endif
105+
100106
public override string ConnectionId { get; set; }
101107

102108
public string InstanceId { get; }
@@ -136,8 +142,6 @@ public bool AbortOnClose
136142

137143
public ILogger<ServiceConnection> Logger { get; init; } = NullLogger<ServiceConnection>.Instance;
138144

139-
private Task DelayTask => Task.Delay(_closeTimeOutMilliseconds);
140-
141145
private CancellationToken OutgoingAborted => _abortOutgoingCts.Token;
142146

143147
public ClientConnectionContext(OpenConnectionMessage serviceMessage,
@@ -330,6 +334,7 @@ internal async Task ProcessOutgoingMessagesAsync(SignalRProtocol.IHubProtocol pr
330334
var next = buffer;
331335
while (!buffer.IsEmpty && protocol.TryParseMessage(ref next, FakeInvocationBinder.Instance, out var message))
332336
{
337+
// we still want messages to successfully going out when application completes
333338
if (!await _pauseHandler.WaitAsync(StaticRandom.Next(500, 1500), OutgoingAborted))
334339
{
335340
Log.OutgoingTaskPaused(Logger, ConnectionId);
@@ -373,6 +378,10 @@ internal async Task ProcessOutgoingMessagesAsync(SignalRProtocol.IHubProtocol pr
373378
Application.Input.AdvanceTo(buffer.Start, buffer.End);
374379
}
375380
}
381+
catch (OperationCanceledException)
382+
{
383+
// cancelled
384+
}
376385
catch (ForwardMessageException)
377386
{
378387
// do nothing.
@@ -401,11 +410,6 @@ internal async Task ProcessApplicationAsync(ConnectionDelegate connectionDelegat
401410
// application task can end when exception, or Context.Abort() from hub
402411
await connectionDelegate(this);
403412
}
404-
catch (ObjectDisposedException)
405-
{
406-
// When the application shuts down and disposes IServiceProvider, HubConnectionHandler.RunHubAsync is still running and runs into _dispatcher.OnDisconnectedAsync
407-
// no need to throw the error out
408-
}
409413
catch (Exception ex)
410414
{
411415
// Capture the exception to communicate it to the transport (this isn't strictly required)
@@ -434,14 +438,20 @@ internal async Task PerformDisconnectAsync()
434438
// Wait for the connection's lifetime task to end
435439
// Wait on the application task to complete
436440
// We wait gracefully here to be consistent with self-host SignalR
437-
await Task.WhenAny(LifetimeTask, DelayTask);
438-
439-
if (!LifetimeTask.IsCompleted)
441+
using var cts = new CancellationTokenSource(_closeTimeOutMilliseconds);
442+
try
440443
{
441-
Log.DetectedLongRunningApplicationTask(Logger, ConnectionId);
444+
await LifetimeTask.OrCancelAsync(cts.Token);
445+
cts.Cancel();
442446
}
447+
catch (OperationCanceledException)
448+
{
443449

444-
await LifetimeTask;
450+
Log.DetectedLongRunningApplicationTask(Logger, ConnectionId, _closeTimeOutMilliseconds);
451+
#if !NETSTANDARD
452+
_connectionClosedCts.Cancel();
453+
#endif
454+
}
445455
}
446456

447457
internal async Task ProcessConnectionDataMessageAsync(ConnectionDataMessage connectionDataMessage)

src/Microsoft.Azure.SignalR/HubHost/ServiceHubDispatcher.cs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,14 @@ public void Start(ConnectionDelegate connectionDelegate, Action<HttpContext> con
134134
public async Task ShutdownAsync()
135135
{
136136
var options = _options.GracefulShutdown;
137-
if (options.Mode == GracefulShutdownMode.Off)
138-
{
139-
return;
140-
}
141-
142137
try
143138
{
144-
var source = new CancellationTokenSource(_options.GracefulShutdown.Timeout);
139+
var source = new CancellationTokenSource(options.Timeout);
145140
await GracefulShutdownAsync(options, source.Token).OrCancelAsync(source.Token);
146141
}
147142
catch (OperationCanceledException)
148143
{
149-
Log.GracefulShutdownTimeoutExceeded(_logger, _hubName, Convert.ToInt32(_options.GracefulShutdown.Timeout.TotalMilliseconds));
144+
Log.GracefulShutdownTimeoutExceeded(_logger, _hubName, Convert.ToInt32(options.Timeout.TotalMilliseconds));
150145
}
151146

152147
Log.StoppingServer(_logger, _hubName);
@@ -158,8 +153,17 @@ private async Task GracefulShutdownAsync(GracefulShutdownOptions options, Cancel
158153
Log.SettingServerOffline(_logger, _hubName);
159154
await _serviceConnectionManager.OfflineAsync(options.Mode, cancellationToken);
160155

161-
Log.TriggeringShutdownHooks(_logger, _hubName);
162-
await options.OnShutdown(Context);
156+
if (options.Mode == GracefulShutdownMode.Off)
157+
{
158+
// By default we directly close all the client connections
159+
Log.CloseClientConnections(_logger, _hubName);
160+
await _serviceConnectionManager.CloseClientConnections(cancellationToken);
161+
}
162+
else
163+
{
164+
Log.TriggeringShutdownHooks(_logger, _hubName);
165+
await options.OnShutdown(Context);
166+
}
163167

164168
Log.WaitingClientConnectionsToClose(_logger, _hubName);
165169
await _clientConnectionManager.WhenAllCompleted();
@@ -217,11 +221,14 @@ private static class Log
217221
LoggerMessage.Define<string>(LogLevel.Information, new EventId(4, "TriggeringShutdownHooks"), "[{hubName}] Triggering shutdown hooks...");
218222

219223
private static readonly Action<ILogger, string, Exception> _waitingClientConnectionsToClose =
220-
LoggerMessage.Define<string>(LogLevel.Information, new EventId(5, "WaitingClientConnectionsToClose"), "[{hubName}] Waiting client connections to close...");
224+
LoggerMessage.Define<string>(LogLevel.Information, new EventId(5, "WaitingClientConnectionsToClose"), "[{hubName}] Waiting for client connections to close...");
221225

222226
private static readonly Action<ILogger, string, Exception> _stoppingServer =
223227
LoggerMessage.Define<string>(LogLevel.Information, new EventId(6, "StoppingServer"), "[{hubName}] Stopping the hub server...");
224228

229+
private static readonly Action<ILogger, string, Exception> _closeClientConnections =
230+
LoggerMessage.Define<string>(LogLevel.Information, new EventId(7, "CloseClientConnections"), "[{hubName}] Closing client connections...");
231+
225232
public static void StartingConnection(ILogger logger, string name, int connectionNumber)
226233
{
227234
_startingConnection(logger, name, connectionNumber, null);
@@ -247,6 +254,11 @@ public static void WaitingClientConnectionsToClose(ILogger logger, string hubNam
247254
_waitingClientConnectionsToClose(logger, hubName, null);
248255
}
249256

257+
public static void CloseClientConnections(ILogger logger, string hubName)
258+
{
259+
_closeClientConnections(logger, hubName, null);
260+
}
261+
250262
public static void StoppingServer(ILogger logger, string hubName)
251263
{
252264
_stoppingServer(logger, hubName, null);

src/Microsoft.Azure.SignalR/ServerConnections/ServiceConnection.cs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,29 @@ protected override Task DisposeConnection(ConnectionContext connection)
111111
return _connectionFactory.DisposeAsync(connection);
112112
}
113113

114-
public override Task CloseClientConnections(CancellationToken token)
114+
public override async Task CloseClientConnections(CancellationToken token)
115115
{
116-
return Task.WhenAll(_clientConnectionManager.ClientConnections.Select(c => ((ClientConnectionContext)c).PerformDisconnectAsync()));
116+
var tasks = new List<Task>();
117+
foreach (var entity in _connectionIds)
118+
{
119+
if (_clientConnectionManager.TryGetClientConnection(entity.Key, out var c) && c is ClientConnectionContext connection)
120+
{
121+
// batch remove 100 connections once
122+
if (tasks.Count % 100 == 0)
123+
{
124+
await Task.Delay(1);
125+
await Task.WhenAll(tasks);
126+
tasks.Clear();
127+
}
128+
// Add a little delay to avoid too much pressure closing all the clients at one time
129+
tasks.Add(connection.PerformDisconnectAsync());
130+
}
131+
}
132+
133+
if (tasks.Count > 0)
134+
{
135+
await Task.WhenAll(tasks);
136+
}
117137
}
118138

119139
protected override Task CleanupClientConnections(string fromInstanceId = null)
@@ -284,7 +304,6 @@ private async Task ProcessClientConnectionAsync(ClientConnectionContext connecti
284304
// app task completes connection.Transport.Output, which will completes connection.Application.Input and ends the transport
285305
// Transports are written by us and are well behaved, wait for them to drain
286306
connection.CancelOutgoing(true);
287-
288307
// transport never throws
289308
await transport;
290309
}

0 commit comments

Comments
 (0)