Skip to content

Commit df13c44

Browse files
authored
Add migrate connection messages (#1964)
* Add migrate connection messages * update * update
1 parent 99463ae commit df13c44

File tree

6 files changed

+158
-4
lines changed

6 files changed

+158
-4
lines changed

src/Microsoft.Azure.SignalR.Protocols/ServiceMessage.cs

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,53 @@
77

88
namespace Microsoft.Azure.SignalR.Protocol
99
{
10+
public enum ConnectionFlowControlOperation
11+
{
12+
/// <summary>
13+
/// Send from ASRS to the app server.
14+
/// Asking the client connection to pause sending messages towards ASRS.
15+
/// </summary>
16+
Pause = 1,
17+
18+
/// <summary>
19+
/// Reply from the app server to ASRS.
20+
/// Ackknowledge that the message sending towards ASRS has been pasued.
21+
/// </summary>
22+
PauseAck = 2,
23+
24+
/// <summary>
25+
/// Send from ASRS to the app server.
26+
/// Asking the client connection to resume sending messages towards ASRS.
27+
/// </summary>
28+
Resume = 3,
29+
30+
/// <summary>
31+
/// Send from ASRS to the app server.
32+
/// Asking the app server to send messages through other server connections.
33+
/// </summary>
34+
Offline = 4,
35+
}
36+
37+
public enum ConnectionType
38+
{
39+
/// <summary>
40+
/// Client connection
41+
/// </summary>
42+
Client = 1,
43+
/// <summary>
44+
/// Server connection
45+
/// </summary>
46+
Server = 2,
47+
}
48+
49+
/// <summary>
50+
/// Interface of ack-able message
51+
/// </summary>
52+
public interface IAckableMessage
53+
{
54+
int AckId { get; set; }
55+
}
56+
1057
/// <summary>
1158
/// Base class of messages between Azure SignalR Service and SDK.
1259
/// </summary>
@@ -25,8 +72,11 @@ public abstract class ServiceMessage
2572
public abstract class ExtensibleServiceMessage : ServiceMessage
2673
{
2774
private const int TracingId = 1;
75+
2876
private const int Ttl = 2;
77+
2978
private const int Protocol = 3;
79+
3080
private const int Filter = 4;
3181
private const int DataMessageType = 5;
3282
private const int IsPartial = 6;
@@ -157,11 +207,17 @@ internal void ReadExtensionMembers(ref MessagePackReader reader)
157207
}
158208

159209
/// <summary>
160-
/// Interface of ack-able message
210+
/// The ASRS send it to the source server connection to initiate a client connection migration.
211+
/// Indicates that incoming messages are blocked.
161212
/// </summary>
162-
public interface IAckableMessage
213+
/// <param name="connectionId">The client connection ID</param>
214+
/// <param name="op">The operation</param>
215+
/// <param name="type">The connection type</param>
216+
public class ConnectionFlowControlMessage(string connectionId, ConnectionFlowControlOperation op, ConnectionType type = ConnectionType.Client) : ConnectionMessage(connectionId)
163217
{
164-
int AckId { get; set; }
218+
public ConnectionFlowControlOperation Operation { get; } = op;
219+
220+
public ConnectionType ConnectionType { get; } = type;
165221
}
166222

167223
/// <summary>
@@ -176,7 +232,7 @@ public class AccessKeyRequestMessage : ExtensibleServiceMessage
176232

177233
/// <summary>
178234
/// Gets or sets the key Id.
179-
/// <c>null</c>
235+
/// <c>null</c>
180236
/// </summary>
181237
public string Kid { get; set; }
182238

src/Microsoft.Azure.SignalR.Protocols/ServiceProtocol.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ private static ServiceMessage ParseMessage(ref MessagePackReader reader)
123123
return CreateErrorCompletionMessage(ref reader, arrayLength);
124124
case ServiceProtocolConstants.ServiceMappingMessageType:
125125
return CreateServiceMappingMessage(ref reader, arrayLength);
126+
case ServiceProtocolConstants.ConnectionFlowControlMessageType:
127+
return CreateConnectionFlowControlMessage(ref reader, arrayLength);
126128
default:
127129
// Future protocol changes can add message types, old clients can ignore them
128130
return null;
@@ -303,6 +305,9 @@ private static void WriteMessageCore(ref MessagePackWriter writer, ServiceMessag
303305
case ServiceMappingMessage serviceMappingMessage:
304306
WriteServiceMappingMessage(ref writer, serviceMappingMessage);
305307
break;
308+
case ConnectionFlowControlMessage connectionFlowControlMessage:
309+
WriteConnectionFlowControlMessage(ref writer, connectionFlowControlMessage);
310+
break;
306311
default:
307312
throw new InvalidDataException($"Unexpected message type: {message.GetType().Name}");
308313
}
@@ -700,6 +705,16 @@ private static void WriteServiceMappingMessage(ref MessagePackWriter writer, Ser
700705
message.WriteExtensionMembers(ref writer);
701706
}
702707

708+
private static void WriteConnectionFlowControlMessage(ref MessagePackWriter writer, ConnectionFlowControlMessage message)
709+
{
710+
writer.WriteArrayHeader(5);
711+
writer.Write(ServiceProtocolConstants.ConnectionFlowControlMessageType);
712+
writer.Write(message.ConnectionId);
713+
writer.WriteInt32((int)message.ConnectionType);
714+
writer.WriteInt32((int)message.Operation);
715+
message.WriteExtensionMembers(ref writer);
716+
}
717+
703718
private static void WriteStringArray(ref MessagePackWriter writer, IReadOnlyList<string> array)
704719
{
705720
if (array?.Count > 0)
@@ -1286,6 +1301,39 @@ private static ServiceMappingMessage CreateServiceMappingMessage(ref MessagePack
12861301
return result;
12871302
}
12881303

1304+
private static ConnectionFlowControlMessage CreateConnectionFlowControlMessage(ref MessagePackReader reader, int arrayLength)
1305+
{
1306+
var connectionId = ReadString(ref reader, "connectionId");
1307+
var connectionType = ReadInt32(ref reader, "connectionType");
1308+
var operation = ReadInt32(ref reader, "operation");
1309+
1310+
switch (connectionType)
1311+
{
1312+
case (int)ConnectionType.Client:
1313+
case (int)ConnectionType.Server:
1314+
break;
1315+
default:
1316+
throw new InvalidDataException($"Unsupported connection type: {connectionType}");
1317+
}
1318+
1319+
switch (operation)
1320+
{
1321+
case (int)ConnectionFlowControlOperation.Pause:
1322+
case (int)ConnectionFlowControlOperation.PauseAck:
1323+
case (int)ConnectionFlowControlOperation.Resume:
1324+
case (int)ConnectionFlowControlOperation.Offline:
1325+
break;
1326+
default:
1327+
throw new InvalidDataException($"Unsupported operation: {operation}");
1328+
}
1329+
1330+
var result = new ConnectionFlowControlMessage(
1331+
connectionId,
1332+
(ConnectionFlowControlOperation)operation,
1333+
(ConnectionType)connectionType);
1334+
return result;
1335+
}
1336+
12891337
private static Claim[] ReadClaims(ref MessagePackReader reader)
12901338
{
12911339
var claimCount = ReadMapLength(ref reader, "claims");

src/Microsoft.Azure.SignalR.Protocols/ServiceProtocolConstants.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,6 @@ public static class ServiceProtocolConstants
4343
public const int ErrorCompletionMessageType = 36;
4444
public const int ServiceMappingMessageType = 37;
4545
public const int ConnectionReconnectMessageType = 38;
46+
public const int ConnectionFlowControlMessageType = 39;
4647
}
4748
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using System.IO;
5+
using Microsoft.Azure.SignalR.Protocol;
6+
using Xunit;
7+
8+
namespace Microsoft.Azure.SignalR.Protocols.Tests;
9+
10+
public class ConnectionFlowControlMessageFacts
11+
{
12+
[Theory]
13+
[InlineData(0, 0)]
14+
[InlineData((int)ConnectionType.Server, 0)]
15+
[InlineData(0, (int)ConnectionFlowControlOperation.Offline)]
16+
public void TestThrowsInvalidDataException(int connectionType, int operation)
17+
{
18+
var message = new ConnectionFlowControlMessage(
19+
"conn1",
20+
(ConnectionFlowControlOperation)operation,
21+
(ConnectionType)connectionType);
22+
var protocol = new ServiceProtocol();
23+
var bytes = protocol.GetMessageBytes(message);
24+
25+
var seq = new System.Buffers.ReadOnlySequence<byte>(bytes);
26+
Assert.Throws<InvalidDataException>(() => protocol.TryParseMessage(ref seq, out var result));
27+
}
28+
}

test/Microsoft.Azure.SignalR.Protocols.Tests/ServiceMessageEqualityComparer.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ public bool Equals(ServiceMessage x, ServiceMessage y)
102102
return ErrorCompletionMessageEqual(errorCompletionMessage, (ErrorCompletionMessage)y);
103103
case ServiceMappingMessage serviceMappingMessage:
104104
return ServiceMappingMessageEqual(serviceMappingMessage, (ServiceMappingMessage)y);
105+
case ConnectionFlowControlMessage connectionFlowControlMessage:
106+
return ConnectionFlowControlMessageEqual(connectionFlowControlMessage, (ConnectionFlowControlMessage)y);
105107
default:
106108
throw new InvalidOperationException($"Unknown message type: {x.GetType().FullName}");
107109
}
@@ -386,6 +388,13 @@ private bool ServiceMappingMessageEqual(ServiceMappingMessage x, ServiceMappingM
386388
StringEqual(x.InstanceId, y.InstanceId);
387389
}
388390

391+
private bool ConnectionFlowControlMessageEqual(ConnectionFlowControlMessage x, ConnectionFlowControlMessage y)
392+
{
393+
return StringEqual(x.ConnectionId, y.ConnectionId) &&
394+
Equals(x.ConnectionType, y.ConnectionType) &&
395+
Equals(x.Operation, y.Operation);
396+
}
397+
389398
private static bool StringEqual(string x, string y)
390399
{
391400
return string.Equals(x, y, StringComparison.Ordinal);

test/Microsoft.Azure.SignalR.Protocols.Tests/ServiceProtocolFacts.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ public static IEnumerable<object[]> TestParseOldData
283283
}.ToDictionary(t => t.Name);
284284

285285
#pragma warning disable CS0618 // Type or member is obsolete
286+
286287
public static IDictionary<string, ProtocolTestData> TestData => new[]
287288
{
288289
new ProtocolTestData(
@@ -693,7 +694,16 @@ public static IEnumerable<object[]> TestParseOldData
693694
name: "ServiceMappingMessage",
694695
message: new ServiceMappingMessage("invocationId", "conn1", "instance1"),
695696
binary: "lSWsaW52b2NhdGlvbklkpWNvbm4xqWluc3RhbmNlMYA="),
697+
new ProtocolTestData(
698+
name: nameof(ConnectionFlowControlMessage) + "-1",
699+
message: new ConnectionFlowControlMessage("conn1", ConnectionFlowControlOperation.Pause, ConnectionType.Client),
700+
binary: "lSelY29ubjHSAAAAAdIAAAABgA=="),
701+
new ProtocolTestData(
702+
name: nameof(ConnectionFlowControlMessage) + "-2",
703+
message: new ConnectionFlowControlMessage("conn2", ConnectionFlowControlOperation.Offline, ConnectionType.Server),
704+
binary: "lSelY29ubjLSAAAAAtIAAAAEgA=="),
696705
}.ToDictionary(t => t.Name);
706+
697707
#pragma warning restore CS0618 // Type or member is obsolete
698708

699709
[Theory]
@@ -840,7 +850,9 @@ private static ServiceMessage ParseServiceMessage(byte[] bytes)
840850
public class ProtocolTestData
841851
{
842852
public string Name { get; private set; }
853+
843854
public string Binary { get; private set; }
855+
844856
public ServiceMessage Message { get; private set; }
845857

846858
public ProtocolTestData(string name, ServiceMessage message, string binary)

0 commit comments

Comments
 (0)