Skip to content

Commit 6448783

Browse files
authored
Reduce usage of RunContinuationsAsynchronously (#2478)
1 parent aab5b2b commit 6448783

File tree

9 files changed

+36
-16
lines changed

9 files changed

+36
-16
lines changed

src/Grpc.Net.Client/Internal/GrpcCall.cs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,16 @@ internal sealed partial class GrpcCall<TRequest, TResponse> : GrpcCall, IGrpcCal
6161
public HttpContentClientStreamWriter<TRequest, TResponse>? ClientStreamWriter { get; private set; }
6262
public HttpContentClientStreamReader<TRequest, TResponse>? ClientStreamReader { get; private set; }
6363

64-
public GrpcCall(Method<TRequest, TResponse> method, GrpcMethodInfo grpcMethodInfo, CallOptions options, GrpcChannel channel, int attemptCount)
64+
public GrpcCall(Method<TRequest, TResponse> method, GrpcMethodInfo grpcMethodInfo, CallOptions options, GrpcChannel channel, int attemptCount, bool forceAsyncHttpResponse)
6565
: base(options, channel)
6666
{
6767
// Validate deadline before creating any objects that require cleanup
6868
ValidateDeadline(options.Deadline);
6969

7070
_callCts = new CancellationTokenSource();
71-
_httpResponseTcs = new TaskCompletionSource<HttpResponseMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
71+
// Retries and hedging can run multiple calls at the same time and use locking for thread-safety.
72+
// Running HTTP response continuation asynchronously is required for locking to work correctly.
73+
_httpResponseTcs = new TaskCompletionSource<HttpResponseMessage>(forceAsyncHttpResponse ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
7274
// Run the callTcs continuation immediately to keep the same context. Required for Activity.
7375
_callTcs = new TaskCompletionSource<Status>();
7476
Method = method;
@@ -142,7 +144,10 @@ public void StartDuplexStreaming()
142144

143145
internal void StartUnaryCore(HttpContent content)
144146
{
145-
_responseTcs = new TaskCompletionSource<TResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
147+
// Not created with RunContinuationsAsynchronously to avoid unnecessary dispatch to the thread pool.
148+
// The TCS is set from RunCall but it is the last operation before the method exits so there shouldn't
149+
// be an impact from running the response continutation synchronously.
150+
_responseTcs = new TaskCompletionSource<TResponse>();
146151

147152
var timeout = GetTimeout();
148153
var message = CreateHttpRequestMessage(timeout);
@@ -161,7 +166,10 @@ internal void StartServerStreamingCore(HttpContent content)
161166

162167
internal void StartClientStreamingCore(HttpContentClientStreamWriter<TRequest, TResponse> clientStreamWriter, HttpContent content)
163168
{
164-
_responseTcs = new TaskCompletionSource<TResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
169+
// Not created with RunContinuationsAsynchronously to avoid unnecessary dispatch to the thread pool.
170+
// The TCS is set from RunCall but it is the last operation before the method exits so there shouldn't
171+
// be an impact from running the response continutation synchronously.
172+
_responseTcs = new TaskCompletionSource<TResponse>();
165173

166174
var timeout = GetTimeout();
167175
var message = CreateHttpRequestMessage(timeout);
@@ -431,9 +439,6 @@ private void CancelCall(Status status)
431439
// Cancellation will also cause reader/writer to throw if used afterwards.
432440
_callCts.Cancel();
433441

434-
// Ensure any logic that is waiting on the HttpResponse is unstuck.
435-
_httpResponseTcs.TrySetCanceled();
436-
437442
// Cancellation token won't send RST_STREAM if HttpClient.SendAsync is complete.
438443
// Dispose HttpResponseMessage to send RST_STREAM to server for in-progress calls.
439444
HttpResponse?.Dispose();
@@ -652,6 +657,9 @@ private async Task RunCall(HttpRequestMessage request, TimeSpan? timeout)
652657
// Verify that FinishCall is called in every code path of this method.
653658
// Should create an "Unassigned variable" compiler error if not set.
654659
Debug.Assert(finished);
660+
// Should be completed before exiting.
661+
Debug.Assert(_httpResponseTcs.Task.IsCompleted);
662+
Debug.Assert(_responseTcs == null || _responseTcs.Task.IsCompleted);
655663
}
656664
}
657665

src/Grpc.Net.Client/Internal/HttpClientCallInvoker.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private static IGrpcCall<TRequest, TResponse> CreateRootGrpcCall<TRequest, TResp
177177
{
178178
// No retry/hedge policy configured. Fast path!
179179
// Note that callWrapper is null here and will be set later.
180-
return CreateGrpcCall<TRequest, TResponse>(channel, method, options, attempt: 1, callWrapper: null);
180+
return CreateGrpcCall<TRequest, TResponse>(channel, method, options, attempt: 1, forceAsyncHttpResponse: false, callWrapper: null);
181181
}
182182
}
183183

@@ -210,14 +210,15 @@ public static GrpcCall<TRequest, TResponse> CreateGrpcCall<TRequest, TResponse>(
210210
Method<TRequest, TResponse> method,
211211
CallOptions options,
212212
int attempt,
213+
bool forceAsyncHttpResponse,
213214
object? callWrapper)
214215
where TRequest : class
215216
where TResponse : class
216217
{
217218
ObjectDisposedThrowHelper.ThrowIf(channel.Disposed, typeof(GrpcChannel));
218219

219220
var methodInfo = channel.GetCachedGrpcMethodInfo(method);
220-
var call = new GrpcCall<TRequest, TResponse>(method, methodInfo, options, channel, attempt);
221+
var call = new GrpcCall<TRequest, TResponse>(method, methodInfo, options, channel, attempt, forceAsyncHttpResponse);
221222
call.CallWrapper = callWrapper;
222223

223224
return call;

src/Grpc.Net.Client/Internal/Retry/HedgingCall.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private async Task StartCall(Action<GrpcCall<TRequest, TResponse>> startCallFunc
7575

7676
OnStartingAttempt();
7777

78-
call = HttpClientCallInvoker.CreateGrpcCall<TRequest, TResponse>(Channel, Method, Options, AttemptCount, CallWrapper);
78+
call = HttpClientCallInvoker.CreateGrpcCall<TRequest, TResponse>(Channel, Method, Options, AttemptCount, forceAsyncHttpResponse: true, CallWrapper);
7979
_activeCalls.Add(call);
8080

8181
startCallFunc(call);

src/Grpc.Net.Client/Internal/Retry/RetryCall.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private async Task StartRetry(Action<GrpcCall<TRequest, TResponse>> startCallFun
109109
// Start new call.
110110
OnStartingAttempt();
111111

112-
currentCall = _activeCall = HttpClientCallInvoker.CreateGrpcCall<TRequest, TResponse>(Channel, Method, Options, AttemptCount, CallWrapper);
112+
currentCall = _activeCall = HttpClientCallInvoker.CreateGrpcCall<TRequest, TResponse>(Channel, Method, Options, AttemptCount, forceAsyncHttpResponse: true, CallWrapper);
113113
startCallFunc(currentCall);
114114

115115
SetNewActiveCallUnsynchronized(currentCall);

src/Grpc.Net.Client/Internal/Retry/RetryCallBase.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ internal abstract partial class RetryCallBase<TRequest, TResponse> : IGrpcCall<T
3737
private Task<TResponse>? _responseTask;
3838
private Task<Metadata>? _responseHeadersTask;
3939
private TRequest? _request;
40+
private bool _commitStarted;
4041

4142
// Internal for unit testing.
4243
internal CancellationTokenRegistration? _ctsRegistration;
@@ -369,8 +370,11 @@ protected void CommitCall(IGrpcCall<TRequest, TResponse> call, CommitReason comm
369370
{
370371
lock (Lock)
371372
{
372-
if (!CommitedCallTask.IsCompletedSuccessfully())
373+
if (!_commitStarted)
373374
{
375+
// Specify that call is commiting. This is to prevent any chance of re-entrancy from logic run in OnCommitCall.
376+
_commitStarted = true;
377+
374378
// The buffer size is verified in unit tests after calls are completed.
375379
// Clear the buffer before commiting call.
376380
ClearRetryBuffer();

test/Grpc.Net.Client.Tests/CancellationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public async Task AsyncClientStreamingCall_CancellationDuringSend_ThrowOperation
113113

114114
cts.Cancel();
115115

116-
var ex = await ExceptionAssert.ThrowsAsync<TaskCanceledException>(() => responseHeadersTask).DefaultTimeout();
116+
var ex = await ExceptionAssert.ThrowsAsync<OperationCanceledException>(() => responseHeadersTask).DefaultTimeout();
117117
Assert.AreEqual(StatusCode.Cancelled, call.GetStatus().StatusCode);
118118
Assert.AreEqual("Call canceled by the client.", call.GetStatus().Detail);
119119
}

test/Grpc.Net.Client.Tests/HttpContentClientStreamReaderTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ private static GrpcCall<HelloRequest, HelloReply> CreateGrpcCall(GrpcChannel cha
231231
new GrpcMethodInfo(new GrpcCallScope(ClientTestHelpers.ServiceMethod.Type, uri), uri, methodConfig: null),
232232
new CallOptions(),
233233
channel,
234-
attemptCount: 0);
234+
attemptCount: 0,
235+
forceAsyncHttpResponse: false);
235236
}
236237

237238
private static GrpcChannel CreateChannel(HttpClient httpClient, ILoggerFactory? loggerFactory = null, bool? throwOperationCanceledOnCancellation = null)

test/Grpc.Net.Client.Tests/ResponseHeadersAsyncTests.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ public async Task AsyncUnaryCall_AuthInterceptorDispose_ResponseHeadersError()
112112
var credentialsSyncPoint = new SyncPoint(runContinuationsAsynchronously: true);
113113
var credentials = CallCredentials.FromInterceptor(async (context, metadata) =>
114114
{
115-
await credentialsSyncPoint.WaitToContinue();
115+
var tcs = new TaskCompletionSource<bool>();
116+
context.CancellationToken.Register(s => ((TaskCompletionSource<bool>)s!).SetResult(true), tcs);
117+
118+
await Task.WhenAny(credentialsSyncPoint.WaitToContinue(), tcs.Task);
116119
metadata.Add("Authorization", $"Bearer TEST");
117120
});
118121

test/Grpc.Net.Client.Tests/Retry/RetryTests.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,10 @@ public async Task AsyncUnaryCall_AuthInteceptorDispose_Error()
165165
var credentialsSyncPoint = new SyncPoint(runContinuationsAsynchronously: true);
166166
var credentials = CallCredentials.FromInterceptor(async (context, metadata) =>
167167
{
168-
await credentialsSyncPoint.WaitToContinue();
168+
var tcs = new TaskCompletionSource<bool>();
169+
context.CancellationToken.Register(s => ((TaskCompletionSource<bool>)s!).SetResult(true), tcs);
170+
171+
await Task.WhenAny(credentialsSyncPoint.WaitToContinue(), tcs.Task);
169172
metadata.Add("Authorization", $"Bearer TEST");
170173
});
171174
var invoker = HttpClientCallInvokerFactory.Create(httpClient, loggerFactory: provider.GetRequiredService<ILoggerFactory>(), serviceConfig: serviceConfig, configure: options => options.Credentials = ChannelCredentials.Create(new SslCredentials(), credentials));

0 commit comments

Comments
 (0)