Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 49 additions & 31 deletions src/Microsoft.Azure.SignalR.Common/ServiceConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ internal abstract class ServiceConnectionBase : IServiceConnection

private bool _isStopped;
private long _lastReceiveTimestamp;
private volatile bool _isConnected;
protected ConnectionContext _connection;

public Task WaitForConnectionStart => _serviceConnectionStartTcs.Task;
Expand All @@ -56,19 +57,38 @@ public ServiceConnectionBase(IServiceProtocol serviceProtocol, ILogger logger, s

public async Task StartAsync()
{
int retryCount = 0;
while (!_isStopped)
{
// If we are not able to start, we will quit this connection.
if (!await StartAsyncCore())
{
_serviceConnectionStartTcs.TrySetResult(false);
return;

await Task.Delay(GetRetryDelay(ref retryCount));
continue;
}

_serviceConnectionStartTcs.TrySetResult(true);

retryCount = 0;
_isConnected = true;
await ProcessIncomingAsync();
_isConnected = false;
}
}

/// <summary>
/// exponential back off with max 1 minute.
/// </summary>
public static TimeSpan GetRetryDelay(ref int retryCount)
{
// retry count: 0, 1, 2, 3, 4, 5, 6, ...
// delay seconds: 1, 2, 4, 8, 16, 32, 60, ...
if (retryCount > 5)
{
return TimeSpan.FromMinutes(1) + ReconnectInterval;
}
return TimeSpan.FromSeconds(1 << retryCount++) + ReconnectInterval;
}

// For test purpose only
Expand All @@ -79,6 +99,9 @@ public Task StopAsync()
return Task.CompletedTask;
}

// For test purpose only
public bool IsConnected => _isConnected;

public async virtual Task WriteAsync(ServiceMessage serviceMessage)
{
// We have to lock around outgoing sends since the pipe is single writer.
Expand Down Expand Up @@ -121,44 +144,39 @@ public async virtual Task WriteAsync(ServiceMessage serviceMessage)

private async Task<bool> StartAsyncCore()
{
// Always try until connected
while (true)
// Lock here in case somebody tries to send before the connection is assigned
await _serviceConnectionLock.WaitAsync();

try
{
// Lock here in case somebody tries to send before the connection is assigned
await _serviceConnectionLock.WaitAsync();
_connection = await CreateConnection();

try
if (await HandshakeAsync())
{
_connection = await CreateConnection();

if (await HandshakeAsync())
{
Log.ServiceConnectionConnected(_logger, _connectionId);
return true;
}
else
{
// False means we got a HandshakeResponseMessage with error. Will take below actions:
// - Dispose the connection
// - Stop reconnect
await DisposeConnection();

return false;
}
Log.ServiceConnectionConnected(_logger, _connectionId);
return true;
}
catch (Exception ex)
else
{
Log.FailedToConnect(_logger, ex);

// False means we got a HandshakeResponseMessage with error. Will take below actions:
// - Dispose the connection
await DisposeConnection();

await Task.Delay(ReconnectInterval);
}
finally
{
_serviceConnectionLock.Release();
return false;
}
}
catch (Exception ex)
{
Log.FailedToConnect(_logger, ex);

await DisposeConnection();

return false;
}
finally
{
_serviceConnectionLock.Release();
}
}

private async Task<bool> HandshakeAsync()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using Xunit;

namespace Microsoft.Azure.SignalR.Common.Tests
{
public class ServiceConnectionBaseTests
{
[Theory]
[InlineData(0, 1, 1, 2)]
[InlineData(1, 2, 2, 3)]
[InlineData(2, 3, 4, 5)]
[InlineData(3, 4, 8, 9)]
[InlineData(4, 5, 16, 17)]
[InlineData(5, 6, 32, 33)]
[InlineData(6, 6, 60, 61)]
[InlineData(600, 600, 60, 61)]
public void TestGetRetryDelay(int count, int exitCount, int minSeconds, int maxSeconds)
{
var c = count;
var span = ServiceConnectionBase.GetRetryDelay(ref c);
Assert.Equal(exitCount, c);
Assert.True(TimeSpan.FromSeconds(minSeconds) < span);
Assert.True(TimeSpan.FromSeconds(maxSeconds) > span);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public Task StartAsync()
return ServiceConnection.StartAsync();
}

public bool IsConnected => ServiceConnection.IsConnected;

public async Task ProcessApplicationMessagesAsync()
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
Expand All @@ -20,14 +21,17 @@ internal class TestConnectionFactory : IConnectionFactory

public virtual TestConnection CurrentConnectionContext { get; private set; }

public List<DateTime> Times { get; } = new List<DateTime>();

public TestConnectionFactory(Func<TestConnection, Task> connectCallback = null)
{
_connectCallback = connectCallback;
}

}
public async Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat, string connectionId,
CancellationToken cancellationToken = default)
{
Times.Add(DateTime.Now);
CurrentConnectionContext = null;

var connection = new TestConnection();
Expand Down
56 changes: 44 additions & 12 deletions test/Microsoft.Azure.SignalR.Tests/ServiceConnectionFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public async Task ServiceConnectionStartsConnection()
var proxy = new ServiceConnectionProxy();

var serverTask = proxy.WaitForServerConnectionAsync(1);
_ = proxy.StartAsync();
_ = proxy.StartAsync();
await serverTask.OrTimeout();

Assert.Empty(proxy.ClientConnectionManager.ClientConnections);
Expand Down Expand Up @@ -57,9 +57,9 @@ public async Task ServiceConnectionStartsConnection()
const string headerKey1 = "custom-header-1";
const string headerValue1 = "custom-value-1";
const string headerKey2 = "custom-header-2";
var headerValue2 = new[] {"custom-value-2a", "custom-value-2b"};
var headerValue2 = new[] { "custom-value-2a", "custom-value-2b" };
const string headerKey3 = "custom-header-3";
var headerValue3 = new[] {"custom-value-3a", "custom-value-3b", "custom-value-3c"};
var headerValue3 = new[] { "custom-value-3a", "custom-value-3b", "custom-value-3c" };
const string path = "/this/is/user/path";

await proxy.WriteMessageAsync(new OpenConnectionMessage(connectionId2, null,
Expand Down Expand Up @@ -225,7 +225,7 @@ public async Task ReconnectWhenKeepAliveFailed()
// Wait for 35s to make the server side timeout
// Assert the server will reconnect
var serverTask2 = proxy.WaitForServerConnectionAsync(2);
Assert.False(Task.WaitAll(new Task[] {serverTask2}, TimeSpan.FromSeconds(1)));
Assert.False(Task.WaitAll(new Task[] { serverTask2 }, TimeSpan.FromSeconds(1)));

await Task.Delay(TimeSpan.FromSeconds(35));

Expand All @@ -243,22 +243,33 @@ public async Task ReconnectWhenHavingIntermittentConnectivityFailure()

var serverTask = proxy.WaitForServerConnectionAsync(1);
_ = proxy.StartAsync();
await serverTask.OrTimeout();
// fail 3 times, 1~2 + 2~3 + 4~5 = 7~10
await serverTask.OrTimeout(11 * 1000);

var connectionId = Guid.NewGuid().ToString("N");

var connectionTask = proxy.WaitForConnectionAsync(connectionId);
await proxy.WriteMessageAsync(new OpenConnectionMessage(connectionId, null));
await connectionTask.OrTimeout();

Assert.True(proxy.IsConnected);
var list = connectionFactory.Times;
Assert.True(TimeSpan.FromSeconds(0.9) < list[1] - list[0]);
Assert.True(TimeSpan.FromSeconds(2.1) > list[1] - list[0]);
Assert.True(TimeSpan.FromSeconds(1.9) < list[2] - list[1]);
Assert.True(TimeSpan.FromSeconds(3.1) > list[2] - list[1]);
Assert.True(TimeSpan.FromSeconds(3.9) < list[3] - list[2]);
Assert.True(TimeSpan.FromSeconds(5.1) > list[3] - list[2]);
}

/// <summary>
/// Service connection should stop reconnecting to service after receiving a handshake response with error message.
/// Service connection should reconnecting to service after receiving a handshake response with error message.
/// </summary>
[Fact]
public async Task StopReconnectAfterReceivingHandshakeErrorMessage()
public async Task ReconnectAfterReceivingHandshakeErrorMessage()
{
var proxy = new ServiceConnectionProxy(connectionFactory: new TestConnectionFactoryWithHandshakeError());
var connectionFactory = new TestConnectionFactoryWithHandshakeError();
var proxy = new ServiceConnectionProxy(connectionFactory: connectionFactory);

var serverTask = proxy.WaitForServerConnectionAsync(1);
_ = proxy.StartAsync();
Expand All @@ -268,9 +279,19 @@ public async Task StopReconnectAfterReceivingHandshakeErrorMessage()
var connectionTask = proxy.WaitForConnectionAsync(connectionId);

await proxy.WriteMessageAsync(new OpenConnectionMessage(connectionId, null));

// Connection exits so the Task should be timeout
Assert.False(Task.WaitAll(new Task[] {connectionTask}, TimeSpan.FromSeconds(1)));
Assert.False(Task.WaitAll(new Task[] { connectionTask }, TimeSpan.FromSeconds(1)));

await Task.Delay(10 * 1000);
Assert.False(proxy.IsConnected);
var list = connectionFactory.Times;
Assert.True(TimeSpan.FromSeconds(0.9) < list[1] - list[0]);
Assert.True(TimeSpan.FromSeconds(2.1) > list[1] - list[0]);
Assert.True(TimeSpan.FromSeconds(1.9) < list[2] - list[1]);
Assert.True(TimeSpan.FromSeconds(3.1) > list[2] - list[1]);
Assert.True(TimeSpan.FromSeconds(3.9) < list[3] - list[2]);
Assert.True(TimeSpan.FromSeconds(5.1) > list[3] - list[2]);
}

/// <summary>
Expand All @@ -285,13 +306,24 @@ public async Task ReconnectWhenHandshakeThrowException()
// Throw exception for 3 times and will be success in the 4th retry
var serverTask = proxy.WaitForServerConnectionAsync(4);
_ = proxy.StartAsync();
await serverTask.OrTimeout();
// fail 3 times, 1~2 + 2~3 + 4~5 = 7~10
await serverTask.OrTimeout(11 * 1000);

var connectionId = Guid.NewGuid().ToString("N");

var connectionTask = proxy.WaitForConnectionAsync(connectionId);
await proxy.WriteMessageAsync(new OpenConnectionMessage(connectionId, null));
await connectionTask.OrTimeout();

Assert.True(proxy.IsConnected);
var list = connectionFactory.Times;
Assert.True(TimeSpan.FromSeconds(0.9) < list[1] - list[0]);
Assert.True(TimeSpan.FromSeconds(2.1) > list[1] - list[0]);
Assert.True(TimeSpan.FromSeconds(1.9) < list[2] - list[1]);
Assert.True(TimeSpan.FromSeconds(3.1) > list[2] - list[1]);
Assert.True(TimeSpan.FromSeconds(3.9) < list[3] - list[2]);
Assert.True(TimeSpan.FromSeconds(5.1) > list[3] - list[2]);

}

/// <summary>
Expand All @@ -308,7 +340,7 @@ public async Task ReconnectWhenConnectionThrowException()

// Try to wait the second handshake after reconnect
var serverTask2 = proxy.WaitForServerConnectionAsync(2);
Assert.False(Task.WaitAll(new Task[] {serverTask2 }, TimeSpan.FromSeconds(1)));
Assert.False(Task.WaitAll(new Task[] { serverTask2 }, TimeSpan.FromSeconds(1)));

// Dispose the connection, then server will throw exception and reconnect
serverConnection1.Transport.Input.CancelPendingRead();
Expand Down