-
Notifications
You must be signed in to change notification settings - Fork 108
Add ClientResultsManager #1684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ClientResultsManager #1684
Changes from all commits
a3451e6
a33c904
b08557f
96058f5
00348d6
a03f434
79a0567
28dc3c6
6b504cf
f3281af
790b6c0
f594136
9dccae7
a7c8c8a
517dfaa
cfd493b
f1b96c1
7a13169
b129204
67c9783
acc313a
6cad3ee
c73d59c
46f1b4c
ea7b1bb
f4e143c
ee60530
d2a3178
5cd56c4
0e71e48
2137268
8299c2d
d18743a
1eec339
d522182
557de85
ecf28b4
789427d
b7d027e
d48ad23
55b5fb4
8811ca0
7867dd8
c4f7ad7
46eaf5a
c5aca5a
80031be
4d196f0
7f5d42e
e296cb8
15876d6
4fc63e5
f4e9f8f
b97589f
8ec3042
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| // Copyright (c) Microsoft. All rights reserved. | ||
| // Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
|
|
||
| using System; | ||
| using Microsoft.AspNetCore.SignalR; | ||
|
|
||
| namespace Microsoft.Azure.SignalR | ||
| { | ||
| internal sealed class DummyClientInvocationManager : IClientInvocationManager | ||
| { | ||
| public ICallerClientResultsManager Caller => throw new NotSupportedException(); | ||
| public IRoutedClientResultsManager Router => throw new NotSupportedException(); | ||
|
|
||
| public DummyClientInvocationManager() | ||
| { | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| // Copyright (c) Microsoft. All rights reserved. | ||
| // Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
|
|
||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Microsoft.Azure.SignalR.Protocol; | ||
|
|
||
| namespace Microsoft.Azure.SignalR | ||
| { | ||
| internal interface ICallerClientResultsManager : IClientResultsManager | ||
| { | ||
| string GenerateInvocationId(string connectionId); | ||
|
|
||
| /// <summary> | ||
| /// Add a invocation which is directly called by current server | ||
| /// </summary> | ||
| /// <typeparam name="T"></typeparam> | ||
| /// <param name="connectionId"></param> | ||
| /// <param name="invocationId"></param> | ||
| /// <param name="instanceId"> The InstanceId of target client the caller server knows when this method is called. If the target client is managed by the caller server, the caller server knows the InstanceId of target client and this parameter is not null. Otherwise, this parameter is null. </param> | ||
| /// <param name="cancellationToken"></param> | ||
| /// <returns></returns> | ||
| Task<T> AddInvocation<T>(string connectionId, string invocationId, string instanceId, CancellationToken cancellationToken); | ||
|
|
||
| void AddServiceMapping(ServiceMappingMessage serviceMappingMessage); | ||
|
|
||
| void CleanupInvocationsByInstance(string instanceId); | ||
|
|
||
| bool TryCompleteResult(string connectionId, ClientCompletionMessage message); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| // Copyright (c) Microsoft. All rights reserved. | ||
| // Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
|
|
||
| namespace Microsoft.Azure.SignalR | ||
| { | ||
| internal interface IClientInvocationManager | ||
| { | ||
| ICallerClientResultsManager Caller { get; } | ||
| IRoutedClientResultsManager Router { get; } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| // Copyright (c) Microsoft. All rights reserved. | ||
| // Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
|
|
||
| using System; | ||
| using Microsoft.AspNetCore.SignalR.Protocol; | ||
| using Microsoft.Azure.SignalR.Protocol; | ||
|
|
||
| namespace Microsoft.Azure.SignalR | ||
| { | ||
| internal interface IClientResultsManager | ||
| { | ||
| bool TryCompleteResult(string connectionId, CompletionMessage message); | ||
|
|
||
| bool TryGetInvocationReturnType(string invocationId, out Type type); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| // Copyright (c) Microsoft. All rights reserved. | ||
| // Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
|
|
||
| using System.Threading; | ||
|
|
||
| namespace Microsoft.Azure.SignalR | ||
| { | ||
| internal interface IRoutedClientResultsManager : IClientResultsManager | ||
| { | ||
| void AddInvocation(string connectionId, string invocationId, string callerServerId, CancellationToken cancellationToken); | ||
|
|
||
| bool ContainsInvocation(string invocationId); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is no need? you can directly use TryGetInvocationReturnType to check if it contains Invocation?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
|
|
||
| void CleanupInvocationsByConnection(string connectionId); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,183 @@ | ||
| // Copyright (c) Microsoft. All rights reserved. | ||
| // Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
| #if NET7_0_OR_GREATER | ||
| using System; | ||
| using System.Diagnostics; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using System.Collections.Generic; | ||
| using System.Collections.Concurrent; | ||
| using Microsoft.Azure.SignalR.Protocol; | ||
xingsy97 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| using Microsoft.AspNetCore.SignalR.Protocol; | ||
| using Microsoft.AspNetCore.SignalR; | ||
|
|
||
| namespace Microsoft.Azure.SignalR | ||
| { | ||
| internal sealed class CallerClientResultsManager : ICallerClientResultsManager, IInvocationBinder | ||
| { | ||
| private readonly ConcurrentDictionary<string, PendingInvocation> _pendingInvocations = new(); | ||
| private readonly string _clientResultManagerId = Guid.NewGuid().ToString("N"); | ||
| private long _lastInvocationId = 0; | ||
|
|
||
| private readonly IHubProtocolResolver _hubProtocolResolver; | ||
|
|
||
| public CallerClientResultsManager(IHubProtocolResolver hubProtocolResolver) | ||
| { | ||
| _hubProtocolResolver = hubProtocolResolver ?? throw new ArgumentNullException(nameof(hubProtocolResolver)); | ||
| } | ||
|
|
||
| public string GenerateInvocationId(string connectionId) | ||
| { | ||
| return $"{connectionId}-{_clientResultManagerId}-{Interlocked.Increment(ref _lastInvocationId)}"; | ||
| } | ||
|
|
||
| public Task<T> AddInvocation<T>(string connectionId, string invocationId, string instanceId, CancellationToken cancellationToken) | ||
| { | ||
| var tcs = new TaskCompletionSourceWithCancellation<T>( | ||
| cancellationToken, | ||
| () => TryCompleteResult(connectionId, CompletionMessage.WithError(invocationId, "Canceled"))); | ||
|
|
||
| // When the caller server is also the client router, Azure SignalR service won't send a ServiceMappingMessage to server. | ||
| // To handle this condition, CallerClientResultsManager itself should record this mapping information rather than waiting for a ServiceMappingMessage sent by service. Only in this condition, this method is called with instanceId != null. | ||
| var result = _pendingInvocations.TryAdd(invocationId, | ||
| new PendingInvocation( | ||
| typeof(T), connectionId, tcs, | ||
| static (state, completionMessage) => | ||
| { | ||
| var tcs = (TaskCompletionSourceWithCancellation<T>)state; | ||
| if (completionMessage.HasResult) | ||
| { | ||
| tcs.TrySetResult((T)completionMessage.Result); | ||
| } | ||
| else | ||
| { | ||
| tcs.TrySetException(new Exception(completionMessage.Error)); | ||
| } | ||
| }) { RouterInstanceId = instanceId } | ||
| ); | ||
| Debug.Assert(result); | ||
|
|
||
| tcs.RegisterCancellation(); | ||
|
|
||
| return tcs.Task; | ||
| } | ||
|
|
||
| public void AddServiceMapping(ServiceMappingMessage serviceMappingMessage) | ||
| { | ||
| if (_pendingInvocations.TryGetValue(serviceMappingMessage.InvocationId, out var invocation)) | ||
| { | ||
| if (invocation.RouterInstanceId == null) | ||
| { | ||
| invocation.RouterInstanceId = serviceMappingMessage.InstanceId; | ||
| } | ||
| else | ||
| { | ||
| // do nothing | ||
| } | ||
| } | ||
| else | ||
| { | ||
| // do nothing | ||
| } | ||
| } | ||
|
|
||
| public void CleanupInvocationsByInstance(string instanceId) | ||
| { | ||
| foreach (var (invocationId, invocation) in _pendingInvocations) | ||
| { | ||
| if (invocation.RouterInstanceId == instanceId) | ||
| { | ||
| var message = new CompletionMessage(invocationId, $"Connection '{invocation.ConnectionId}' is disconnected.", null, false); | ||
|
|
||
| invocation.Complete(invocation.Tcs, message); | ||
xingsy97 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| _pendingInvocations.TryRemove(invocationId, out _); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public bool TryCompleteResult(string connectionId, CompletionMessage message) | ||
| { | ||
| if (_pendingInvocations.TryGetValue(message.InvocationId, out var item)) | ||
| { | ||
| if (item.ConnectionId != connectionId) | ||
| { | ||
| // Follow https://github.com/dotnet/aspnetcore/blob/main/src/SignalR/common/Shared/ClientResultsManager.cs#L58 | ||
| throw new InvalidOperationException($"Connection ID '{connectionId}' is not valid for invocation ID '{message.InvocationId}'."); | ||
| } | ||
|
|
||
| // if false the connection disconnected right after the above TryGetValue | ||
| // or someone else completed the invocation (likely a bad client) | ||
| // we'll ignore both cases | ||
| if (_pendingInvocations.TryRemove(message.InvocationId, out _)) | ||
| { | ||
| item.Complete(item.Tcs, message); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| else | ||
| { | ||
| // connection was disconnected or someone else completed the invocation | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| public bool TryCompleteResult(string connectionId, ClientCompletionMessage message) | ||
| { | ||
| var protocol = _hubProtocolResolver.GetProtocol(message.Protocol, null); | ||
| if (protocol == null) | ||
| { | ||
| var errorMessage = $"Not supported protocol {message.Protocol} by server."; | ||
| return TryCompleteResult(connectionId, new CompletionMessage(message.InvocationId, errorMessage, null, false)); | ||
| } | ||
|
|
||
| var payload = message.Payload; | ||
| if (protocol.TryParseMessage(ref payload, this, out var hubMessage)) | ||
| { | ||
| if (hubMessage is CompletionMessage completionMessage) | ||
| { | ||
| return TryCompleteResult(connectionId, completionMessage); | ||
| } | ||
| else | ||
| { | ||
| throw new InvalidOperationException($"The payload of ClientCompletionMessage whose type is {hubMessage.GetType().Name} cannot be parsed into CompletionMessage correctly."); | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| // Implemented for interface IInvocationBinder | ||
| public Type GetReturnType(string invocationId) | ||
| { | ||
| if (TryGetInvocationReturnType(invocationId, out var type)) | ||
| { | ||
| return type; | ||
| } | ||
| // This exception will be handled by https://github.com/dotnet/aspnetcore/blob/f96dce6889fe67aaed33f0c2b147b8b537358f1e/src/SignalR/common/Shared/TryGetReturnType.cs#L14 with a silent failure. The user won't be interrupted. | ||
| throw new InvalidOperationException($"Invocation ID '{invocationId}' is not associated with a pending client result."); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible that invocation is already removed? What happens to the customer when it throws? |
||
| } | ||
|
|
||
| public bool TryGetInvocationReturnType(string invocationId, out Type type) | ||
| { | ||
| if (_pendingInvocations.TryGetValue(invocationId, out var item)) | ||
| { | ||
| type = item.Type; | ||
| return true; | ||
| } | ||
| type = null; | ||
| return false; | ||
| } | ||
|
|
||
| // Unused, here to honor the IInvocationBinder interface but should never be called | ||
| public IReadOnlyList<Type> GetParameterTypes(string methodName) => throw new NotImplementedException(); | ||
|
|
||
| // Unused, here to honor the IInvocationBinder interface but should never be called | ||
| public Type GetStreamItemType(string streamId) => throw new NotImplementedException(); | ||
|
|
||
| private record PendingInvocation(Type Type, string ConnectionId, object Tcs, Action<object, CompletionMessage> Complete) | ||
| { | ||
| public string RouterInstanceId { get; set; } | ||
| } | ||
| } | ||
| } | ||
| #endif | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| // Copyright (c) Microsoft. All rights reserved. | ||
| // Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
| #if NET7_0_OR_GREATER | ||
| using System; | ||
| using Microsoft.AspNetCore.SignalR; | ||
|
|
||
| namespace Microsoft.Azure.SignalR | ||
| { | ||
| internal sealed class ClientInvocationManager : IClientInvocationManager | ||
| { | ||
| public ICallerClientResultsManager Caller { get; } | ||
| public IRoutedClientResultsManager Router { get; } | ||
|
|
||
| public ClientInvocationManager(IHubProtocolResolver hubProtocolResolver) | ||
| { | ||
| Caller = new CallerClientResultsManager(hubProtocolResolver ?? throw new ArgumentNullException(nameof(hubProtocolResolver))); | ||
| Router = new RoutedClientResultsManager(); | ||
| } | ||
| } | ||
| } | ||
| #endif |
Uh oh!
There was an error while loading. Please reload this page.