Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions src/Microsoft.Azure.SignalR.Protocols/ServiceMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,53 @@

namespace Microsoft.Azure.SignalR.Protocol
{
public enum ConnectionFlowControlOperation
{
/// <summary>
/// Send from ASRS to the app server.
/// Asking the client connection to pause sending messages towards ASRS.
/// </summary>
Pause = 1,

/// <summary>
/// Reply from the app server to ASRS.
/// Ackknowledge that the message sending towards ASRS has been pasued.
/// </summary>
PauseAck = 2,

/// <summary>
/// Send from ASRS to the app server.
/// Asking the client connection to resume sending messages towards ASRS.
/// </summary>
Resume = 3,

/// <summary>
/// Send from ASRS to the app server.
/// Asking the app server to send messages through other server connections.
/// </summary>
Offline = 4,
}

public enum ConnectionType
{
/// <summary>
/// Client connection
/// </summary>
Client = 1,
/// <summary>
/// Server connection
/// </summary>
Server = 2,
}

/// <summary>
/// Interface of ack-able message
/// </summary>
public interface IAckableMessage
{
int AckId { get; set; }
}

/// <summary>
/// Base class of messages between Azure SignalR Service and SDK.
/// </summary>
Expand All @@ -25,8 +72,11 @@ public abstract class ServiceMessage
public abstract class ExtensibleServiceMessage : ServiceMessage
{
private const int TracingId = 1;

private const int Ttl = 2;

private const int Protocol = 3;

private const int Filter = 4;
private const int DataMessageType = 5;
private const int IsPartial = 6;
Expand Down Expand Up @@ -157,11 +207,17 @@ internal void ReadExtensionMembers(ref MessagePackReader reader)
}

/// <summary>
/// Interface of ack-able message
/// The ASRS send it to the source server connection to initiate a client connection migration.
/// Indicates that incoming messages are blocked.
/// </summary>
public interface IAckableMessage
/// <param name="connectionId">The client connection ID</param>
/// <param name="op">The operation</param>
/// <param name="type">The connection type</param>
public class ConnectionFlowControlMessage(string connectionId, ConnectionFlowControlOperation op, ConnectionType type = ConnectionType.Client) : ConnectionMessage(connectionId)
{
int AckId { get; set; }
public ConnectionFlowControlOperation Operation { get; } = op;

public ConnectionType ConnectionType { get; } = type;
}

/// <summary>
Expand All @@ -176,7 +232,7 @@ public class AccessKeyRequestMessage : ExtensibleServiceMessage

/// <summary>
/// Gets or sets the key Id.
/// <c>null</c>
/// <c>null</c>
/// </summary>
public string Kid { get; set; }

Expand Down
48 changes: 48 additions & 0 deletions src/Microsoft.Azure.SignalR.Protocols/ServiceProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ private static ServiceMessage ParseMessage(ref MessagePackReader reader)
return CreateErrorCompletionMessage(ref reader, arrayLength);
case ServiceProtocolConstants.ServiceMappingMessageType:
return CreateServiceMappingMessage(ref reader, arrayLength);
case ServiceProtocolConstants.ConnectionFlowControlMessageType:
return CreateConnectionFlowControlMessage(ref reader, arrayLength);
default:
// Future protocol changes can add message types, old clients can ignore them
return null;
Expand Down Expand Up @@ -303,6 +305,9 @@ private static void WriteMessageCore(ref MessagePackWriter writer, ServiceMessag
case ServiceMappingMessage serviceMappingMessage:
WriteServiceMappingMessage(ref writer, serviceMappingMessage);
break;
case ConnectionFlowControlMessage connectionFlowControlMessage:
WriteConnectionFlowControlMessage(ref writer, connectionFlowControlMessage);
break;
default:
throw new InvalidDataException($"Unexpected message type: {message.GetType().Name}");
}
Expand Down Expand Up @@ -700,6 +705,16 @@ private static void WriteServiceMappingMessage(ref MessagePackWriter writer, Ser
message.WriteExtensionMembers(ref writer);
}

private static void WriteConnectionFlowControlMessage(ref MessagePackWriter writer, ConnectionFlowControlMessage message)
{
writer.WriteArrayHeader(5);
writer.Write(ServiceProtocolConstants.ConnectionFlowControlMessageType);
writer.Write(message.ConnectionId);
writer.WriteInt32((int)message.ConnectionType);
writer.WriteInt32((int)message.Operation);
message.WriteExtensionMembers(ref writer);
}

private static void WriteStringArray(ref MessagePackWriter writer, IReadOnlyList<string> array)
{
if (array?.Count > 0)
Expand Down Expand Up @@ -1286,6 +1301,39 @@ private static ServiceMappingMessage CreateServiceMappingMessage(ref MessagePack
return result;
}

private static ConnectionFlowControlMessage CreateConnectionFlowControlMessage(ref MessagePackReader reader, int arrayLength)
{
var connectionId = ReadString(ref reader, "connectionId");
var connectionType = ReadInt32(ref reader, "connectionType");
var operation = ReadInt32(ref reader, "operation");

switch (connectionType)
{
case (int)ConnectionType.Client:
case (int)ConnectionType.Server:
break;
default:
throw new InvalidDataException($"Unsupported connection type: {connectionType}");
}

switch (operation)
{
case (int)ConnectionFlowControlOperation.Pause:
case (int)ConnectionFlowControlOperation.PauseAck:
case (int)ConnectionFlowControlOperation.Resume:
case (int)ConnectionFlowControlOperation.Offline:
break;
default:
throw new InvalidDataException($"Unsupported operation: {operation}");
}

var result = new ConnectionFlowControlMessage(
connectionId,
(ConnectionFlowControlOperation)operation,
(ConnectionType)connectionType);
return result;
}

private static Claim[] ReadClaims(ref MessagePackReader reader)
{
var claimCount = ReadMapLength(ref reader, "claims");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ public static class ServiceProtocolConstants
public const int ErrorCompletionMessageType = 36;
public const int ServiceMappingMessageType = 37;
public const int ConnectionReconnectMessageType = 38;
public const int ConnectionFlowControlMessageType = 39;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.IO;
using Microsoft.Azure.SignalR.Protocol;
using Xunit;

namespace Microsoft.Azure.SignalR.Protocols.Tests;

public class ConnectionFlowControlMessageFacts
{
[Theory]
[InlineData(0, 0)]
[InlineData((int)ConnectionType.Server, 0)]
[InlineData(0, (int)ConnectionFlowControlOperation.Offline)]
public void TestThrowsInvalidDataException(int connectionType, int operation)
{
var message = new ConnectionFlowControlMessage(
"conn1",
(ConnectionFlowControlOperation)operation,
(ConnectionType)connectionType);
var protocol = new ServiceProtocol();
var bytes = protocol.GetMessageBytes(message);

var seq = new System.Buffers.ReadOnlySequence<byte>(bytes);
Assert.Throws<InvalidDataException>(() => protocol.TryParseMessage(ref seq, out var result));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public bool Equals(ServiceMessage x, ServiceMessage y)
return ErrorCompletionMessageEqual(errorCompletionMessage, (ErrorCompletionMessage)y);
case ServiceMappingMessage serviceMappingMessage:
return ServiceMappingMessageEqual(serviceMappingMessage, (ServiceMappingMessage)y);
case ConnectionFlowControlMessage connectionFlowControlMessage:
return ConnectionFlowControlMessageEqual(connectionFlowControlMessage, (ConnectionFlowControlMessage)y);
default:
throw new InvalidOperationException($"Unknown message type: {x.GetType().FullName}");
}
Expand Down Expand Up @@ -386,6 +388,13 @@ private bool ServiceMappingMessageEqual(ServiceMappingMessage x, ServiceMappingM
StringEqual(x.InstanceId, y.InstanceId);
}

private bool ConnectionFlowControlMessageEqual(ConnectionFlowControlMessage x, ConnectionFlowControlMessage y)
{
return StringEqual(x.ConnectionId, y.ConnectionId) &&
Equals(x.ConnectionType, y.ConnectionType) &&
Equals(x.Operation, y.Operation);
}

private static bool StringEqual(string x, string y)
{
return string.Equals(x, y, StringComparison.Ordinal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ public static IEnumerable<object[]> TestParseOldData
}.ToDictionary(t => t.Name);

#pragma warning disable CS0618 // Type or member is obsolete

public static IDictionary<string, ProtocolTestData> TestData => new[]
{
new ProtocolTestData(
Expand Down Expand Up @@ -693,7 +694,16 @@ public static IEnumerable<object[]> TestParseOldData
name: "ServiceMappingMessage",
message: new ServiceMappingMessage("invocationId", "conn1", "instance1"),
binary: "lSWsaW52b2NhdGlvbklkpWNvbm4xqWluc3RhbmNlMYA="),
new ProtocolTestData(
name: nameof(ConnectionFlowControlMessage) + "-1",
message: new ConnectionFlowControlMessage("conn1", ConnectionFlowControlOperation.Pause, ConnectionType.Client),
binary: "lSelY29ubjHSAAAAAdIAAAABgA=="),
new ProtocolTestData(
name: nameof(ConnectionFlowControlMessage) + "-2",
message: new ConnectionFlowControlMessage("conn2", ConnectionFlowControlOperation.Offline, ConnectionType.Server),
binary: "lSelY29ubjLSAAAAAtIAAAAEgA=="),
}.ToDictionary(t => t.Name);

#pragma warning restore CS0618 // Type or member is obsolete

[Theory]
Expand Down Expand Up @@ -840,7 +850,9 @@ private static ServiceMessage ParseServiceMessage(byte[] bytes)
public class ProtocolTestData
{
public string Name { get; private set; }

public string Binary { get; private set; }

public ServiceMessage Message { get; private set; }

public ProtocolTestData(string name, ServiceMessage message, string binary)
Expand Down