Skip to content

Commit 2f63cd3

Browse files
committed
To implement group member query API in persistent mode, add group member query message protocol.
* Add a mechanism to respond with binary payload from service. * Move some util method in `ServiceProtocol` to a separate util class `MessagePackUtils`, so that they could be reused by other classes. * Deprecate `string message` member in `AckMessage` and add `Payload` member. The `string message` member is never used. * Introduce `IMessagePackSerializable` interface, so that we could put the (de)serialization methods of model classes inside themselves. * Refactor `AckHandler` to allow acking with binary payload * Add `GroupMemberQueryMessage` in service protocol. * Add models `GroupMember` and `GroupMemberQueryResponsePayload`.
1 parent 2df6718 commit 2f63cd3

File tree

11 files changed

+464
-345
lines changed

11 files changed

+464
-345
lines changed

src/Microsoft.Azure.SignalR.Common/ServiceConnections/ServiceConnectionContainerBase.cs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public virtual Task HandlePingAsync(PingMessage pingMessage)
221221

222222
public void HandleAck(AckMessage ackMessage)
223223
{
224-
_ackHandler.TriggerAck(ackMessage.AckId, (AckStatus)ackMessage.Status);
224+
_ackHandler.TriggerAck(ackMessage.AckId, (AckStatus)ackMessage.Status, ackMessage.Payload);
225225
}
226226

227227
public virtual Task WriteAsync(ServiceMessage serviceMessage)
@@ -249,6 +249,29 @@ public async Task<bool> WriteAckableMessageAsync(ServiceMessage serviceMessage,
249249
return AckHandler.HandleAckStatus(ackableMessage, status);
250250
}
251251

252+
/// <summary>
253+
/// <see cref="WriteAckableMessageAsync(ServiceMessage, CancellationToken)"/> only checks <see cref="AckMessage.Status"/> as the response,
254+
/// while this method checks <see cref="AckMessage.Payload"/> and deserialize it to <typeparamref name="T"/>.
255+
/// </summary>
256+
public async Task<T> InvokeAsync<T>(ServiceMessage serviceMessage, CancellationToken cancellationToken = default) where T : IMessagePackSerializable, new()
257+
{
258+
if (serviceMessage is not IAckableMessage ackableMessage)
259+
{
260+
throw new ArgumentException($"{nameof(serviceMessage)} is not {nameof(IAckableMessage)}");
261+
}
262+
263+
var task = _ackHandler.CreateSingleAck<T>(out var id, null, cancellationToken);
264+
ackableMessage.AckId = id;
265+
266+
// Sending regular messages completes as soon as the data leaves the outbound pipe,
267+
// whereas ackable ones complete upon full roundtrip of the message and the ack (or timeout).
268+
// Therefore sending them over different connections creates a possibility for processing them out of original order.
269+
// By sending both message types over the same connection we ensure that they are sent (and processed) in their original order.
270+
await WriteMessageAsync(serviceMessage);
271+
272+
return await task;
273+
}
274+
252275
public virtual Task OfflineAsync(GracefulShutdownMode mode, CancellationToken token)
253276
{
254277
_terminated = true;

src/Microsoft.Azure.SignalR.Common/Utilities/AckHandler.cs

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1-
using System;
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using System;
5+
using System.Buffers;
26
using System.Collections.Concurrent;
37
using System.Threading;
48
using System.Threading.Tasks;
9+
using MessagePack;
510
using Microsoft.Azure.SignalR.Common;
611
using Microsoft.Azure.SignalR.Protocol;
712

@@ -35,15 +40,27 @@ public Task<AckStatus> CreateSingleAck(out int id, TimeSpan? ackTimeout = defaul
3540
{
3641
return Task.FromResult(AckStatus.Ok);
3742
}
38-
var info = (IAckInfo<AckStatus>)_acks.GetOrAdd(id, _ => new SingleAckInfo(ackTimeout ?? _defaultAckTimeout));
39-
if (info is MultiAckInfo)
43+
var info = (IAckInfo<AckStatus>)_acks.GetOrAdd(id, _ => new SingleAckWithStatusInfo(ackTimeout ?? _defaultAckTimeout));
44+
if (info is MultiAckWithStatusInfo)
4045
{
4146
throw new InvalidOperationException();
4247
}
4348
cancellationToken.Register(() => info.Cancel());
4449
return info.Task;
4550
}
4651

52+
public Task<T> CreateSingleAck<T>(out int id, TimeSpan? ackTimeout = default, CancellationToken cancellationToken = default) where T : IMessagePackSerializable, new()
53+
{
54+
id = NextId();
55+
if (_disposed)
56+
{
57+
return Task.FromResult(new T());
58+
}
59+
var info = (IAckInfo<IMessagePackSerializable>)_acks.GetOrAdd(id, _ => new SingleAckWithMessagePackPayloadInfo<T>(ackTimeout ?? _defaultAckTimeout));
60+
cancellationToken.Register(info.Cancel);
61+
return info.Task.ContinueWith(task => (T)task.Result);
62+
}
63+
4764
public static bool HandleAckStatus(IAckableMessage message, AckStatus status)
4865
{
4966
return status switch
@@ -62,29 +79,19 @@ public Task<AckStatus> CreateMultiAck(out int id, TimeSpan? ackTimeout = default
6279
{
6380
return Task.FromResult(AckStatus.Ok);
6481
}
65-
var info = (IAckInfo<AckStatus>)_acks.GetOrAdd(id, _ => new MultiAckInfo(ackTimeout ?? _defaultAckTimeout));
66-
if (info is SingleAckInfo)
82+
var info = (IAckInfo<AckStatus>)_acks.GetOrAdd(id, _ => new MultiAckWithStatusInfo(ackTimeout ?? _defaultAckTimeout));
83+
if (info is SingleAckInfo<AckStatus>)
6784
{
6885
throw new InvalidOperationException();
6986
}
7087
return info.Task;
7188
}
7289

73-
public void TriggerAck(int id, AckStatus status = AckStatus.Ok)
90+
public void TriggerAck(int id, AckStatus status = AckStatus.Ok, ReadOnlySequence<byte>? payload = default)
7491
{
75-
if (_acks.TryGetValue(id, out var info))
92+
if (_acks.TryGetValue(id, out var info) && info.Ack(status, payload))
7693
{
77-
switch (info)
78-
{
79-
case IAckInfo<AckStatus> ackInfo:
80-
if (ackInfo.Ack(status))
81-
{
82-
_acks.TryRemove(id, out _);
83-
}
84-
break;
85-
default:
86-
throw new InvalidCastException($"Expected: IAckInfo<{typeof(IAckInfo<AckStatus>).Name}>, actual type: {info.GetType().Name}");
87-
}
94+
_acks.TryRemove(id, out _);
8895
}
8996
}
9097

@@ -125,11 +132,11 @@ private void CheckAcks()
125132
{
126133
if (_acks.TryRemove(id, out _))
127134
{
128-
if (ack is SingleAckInfo singleAckInfo)
135+
if (ack is SingleAckInfo<AckStatus> singleAckInfo)
129136
{
130137
singleAckInfo.Ack(AckStatus.Timeout);
131138
}
132-
else if (ack is MultiAckInfo multipleAckInfo)
139+
else if (ack is MultiAckWithStatusInfo multipleAckInfo)
133140
{
134141
multipleAckInfo.ForceAck(AckStatus.Timeout);
135142
}
@@ -170,39 +177,57 @@ private interface IAckInfo
170177
{
171178
DateTime TimeoutAt { get; }
172179
void Cancel();
180+
bool Ack(AckStatus status, ReadOnlySequence<byte>? payload = null);
173181
}
174182

175183
private interface IAckInfo<T> : IAckInfo
176184
{
177185
Task<T> Task { get; }
178-
bool Ack(T status);
179186
}
180187

181188
public interface IMultiAckInfo
182189
{
183190
bool SetExpectedCount(int expectedCount);
184191
}
185192

186-
private sealed class SingleAckInfo : IAckInfo<AckStatus>
193+
private abstract class SingleAckInfo<T> : IAckInfo<T>
187194
{
188-
public readonly TaskCompletionSource<AckStatus> _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
189-
195+
public readonly TaskCompletionSource<T> _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
190196
public DateTime TimeoutAt { get; }
191-
192197
public SingleAckInfo(TimeSpan timeout)
193198
{
194199
TimeoutAt = DateTime.UtcNow + timeout;
195200
}
201+
public abstract bool Ack(AckStatus status, ReadOnlySequence<byte>? payload = null);
202+
public Task<T> Task => _tcs.Task;
203+
public void Cancel() => _tcs.TrySetCanceled();
204+
}
196205

197-
public bool Ack(AckStatus status = AckStatus.Ok) =>
198-
_tcs.TrySetResult(status);
206+
private class SingleAckWithStatusInfo : SingleAckInfo<AckStatus>
207+
{
199208

200-
public Task<AckStatus> Task => _tcs.Task;
209+
public SingleAckWithStatusInfo(TimeSpan timeout) : base(timeout) { }
201210

202-
public void Cancel() => _tcs.TrySetCanceled();
211+
public override bool Ack(AckStatus status, ReadOnlySequence<byte>? payload = null) =>
212+
_tcs.TrySetResult(status);
213+
}
214+
215+
private sealed class SingleAckWithMessagePackPayloadInfo<T> : SingleAckInfo<IMessagePackSerializable> where T : IMessagePackSerializable, new()
216+
{
217+
public SingleAckWithMessagePackPayloadInfo(TimeSpan timeout) : base(timeout) { }
218+
public override bool Ack(AckStatus status, ReadOnlySequence<byte>? payload = null)
219+
{
220+
if (payload == null)
221+
{
222+
throw new ArgumentNullException(nameof(payload));
223+
}
224+
var reader = new MessagePackReader(payload.Value);
225+
var result = reader.Deserialize<T>(string.Empty);
226+
return _tcs.TrySetResult(result);
227+
}
203228
}
204229

205-
private sealed class MultiAckInfo : IAckInfo<AckStatus>, IMultiAckInfo
230+
private sealed class MultiAckWithStatusInfo : IAckInfo<AckStatus>, IMultiAckInfo
206231
{
207232
public readonly TaskCompletionSource<AckStatus> _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
208233

@@ -211,7 +236,7 @@ private sealed class MultiAckInfo : IAckInfo<AckStatus>, IMultiAckInfo
211236

212237
public DateTime TimeoutAt { get; }
213238

214-
public MultiAckInfo(TimeSpan timeout)
239+
public MultiAckWithStatusInfo(TimeSpan timeout)
215240
{
216241
TimeoutAt = DateTime.UtcNow + timeout;
217242
}
@@ -239,7 +264,7 @@ public bool SetExpectedCount(int expectedCount)
239264
return result;
240265
}
241266

242-
public bool Ack(AckStatus status = AckStatus.Ok)
267+
public bool Ack(AckStatus status = AckStatus.Ok, ReadOnlySequence<byte>? payload = null)
243268
{
244269
bool result;
245270
lock (_tcs)

src/Microsoft.Azure.SignalR.Management/HubContext/GroupManager.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
// Copyright (c) Microsoft. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

4+
using System;
5+
using System.Collections.Generic;
46
using System.Threading;
57
using System.Threading.Tasks;
68
using Microsoft.AspNetCore.SignalR;
9+
using Microsoft.Azure.SignalR.Protocol;
710

811
namespace Microsoft.Azure.SignalR.Management
912
{
@@ -14,5 +17,7 @@ public abstract class GroupManager : IGroupManager
1417
public abstract Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default);
1518

1619
public abstract Task RemoveFromAllGroupsAsync(string connectionId, CancellationToken cancellationToken = default);
20+
21+
public virtual IAsyncEnumerable<GroupMember> ListConnectionsInGroup(string groupName, int? max, CancellationToken token) => throw new NotImplementedException();
1722
}
1823
}

0 commit comments

Comments
 (0)