Skip to content

Commit f90bb60

Browse files
authored
[browser] WS & HTTP clients more async (#95483)
1 parent 5004872 commit f90bb60

File tree

9 files changed

+193
-68
lines changed

9 files changed

+193
-68
lines changed

src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,8 +1391,20 @@ await server.AcceptConnectionAsync(async connection =>
13911391
{
13921392
await connection.ReadRequestDataAsync();
13931393
tcs2.SetResult(true);
1394-
await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false);
1395-
await connection.SendResponseBodyAsync("1\r\nh\r\n", false);
1394+
try
1395+
{
1396+
await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false);
1397+
await connection.SendResponseBodyAsync("1\r\nh\r\n", false);
1398+
}
1399+
catch (IOException ex)
1400+
{
1401+
// when testing in the browser, we are using the WebSocket for the loopback
1402+
// it could get disconnected after the cancellation above, earlier than the server-side gets chance to write the response
1403+
if (!(ex.InnerException is InvalidOperationException ivd) || !ivd.Message.Contains("The WebSocket is not connected"))
1404+
{
1405+
throw;
1406+
}
1407+
}
13961408
await tcs.Task;
13971409
});
13981410
});

src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessa
334334
{
335335
bool? allowAutoRedirect = _isAllowAutoRedirectTouched ? AllowAutoRedirect : null;
336336
#if FEATURE_WASM_THREADS
337-
return JSHost.CurrentOrMainJSSynchronizationContext.Send(() =>
337+
return JSHost.CurrentOrMainJSSynchronizationContext.Post(() =>
338338
{
339339
#endif
340340
return Impl(request, cancellationToken, allowAutoRedirect);
@@ -365,7 +365,7 @@ private Task WriteAsyncCore(ReadOnlyMemory<byte> buffer, CancellationToken cance
365365
{
366366
cancellationToken.ThrowIfCancellationRequested();
367367
#if FEATURE_WASM_THREADS
368-
return _transformStream.SynchronizationContext.Send(() => Impl(this, buffer, cancellationToken));
368+
return _transformStream.SynchronizationContext.Post(() => Impl(this, buffer, cancellationToken));
369369
#else
370370
return Impl(this, buffer, cancellationToken);
371371
#endif
@@ -474,24 +474,25 @@ public void Dispose()
474474
return;
475475

476476
#if FEATURE_WASM_THREADS
477-
FetchResponse?.SynchronizationContext.Send(static (WasmFetchResponse self) =>
477+
FetchResponse?.SynchronizationContext.Post(static (WasmFetchResponse self) =>
478478
{
479479
lock (self.ThisLock)
480480
{
481-
if (self._isDisposed)
482-
return;
483-
self._isDisposed = true;
484-
self._abortRegistration.Dispose();
485-
self._abortController.Dispose();
486-
if (!self.FetchResponse!.IsDisposed)
481+
if (!self._isDisposed)
487482
{
488-
BrowserHttpInterop.AbortResponse(self.FetchResponse);
483+
self._isDisposed = true;
484+
self._abortRegistration.Dispose();
485+
self._abortController.Dispose();
486+
if (!self.FetchResponse!.IsDisposed)
487+
{
488+
BrowserHttpInterop.AbortResponse(self.FetchResponse);
489+
}
490+
self.FetchResponse.Dispose();
491+
self.FetchResponse = null;
489492
}
490-
self.FetchResponse.Dispose();
491-
self.FetchResponse = null;
493+
return Task.CompletedTask;
492494
}
493495
}, this);
494-
495496
#else
496497
_isDisposed = true;
497498
_abortRegistration.Dispose();
@@ -521,7 +522,7 @@ public BrowserHttpContent(WasmFetchResponse fetchResponse)
521522
_fetchResponse = fetchResponse;
522523
}
523524

524-
// TODO alocate smaller buffer and call multiple times
525+
// TODO allocate smaller buffer and call multiple times
525526
private async ValueTask<byte[]> GetResponseData(CancellationToken cancellationToken)
526527
{
527528
Task<int> promise;
@@ -557,12 +558,13 @@ protected override Task<Stream> CreateContentReadStreamAsync()
557558
{
558559
_fetchResponse.ThrowIfDisposed();
559560
#if FEATURE_WASM_THREADS
560-
return _fetchResponse.FetchResponse!.SynchronizationContext.Send(() => Impl(this));
561+
return _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this));
561562
#else
562563
return Impl(this);
563564
#endif
564565
static async Task<Stream> Impl(BrowserHttpContent self)
565566
{
567+
self._fetchResponse.ThrowIfDisposed();
566568
byte[] data = await self.GetResponseData(CancellationToken.None).ConfigureAwait(true);
567569
return new MemoryStream(data, writable: false);
568570
}
@@ -576,13 +578,14 @@ protected override Task SerializeToStreamAsync(Stream stream, TransportContext?
576578
ArgumentNullException.ThrowIfNull(stream, nameof(stream));
577579
_fetchResponse.ThrowIfDisposed();
578580
#if FEATURE_WASM_THREADS
579-
return _fetchResponse.FetchResponse!.SynchronizationContext.Send(() => Impl(this, stream, cancellationToken));
581+
return _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this, stream, cancellationToken));
580582
#else
581583
return Impl(this, stream, cancellationToken);
582584
#endif
583585

584586
static async Task Impl(BrowserHttpContent self, Stream stream, CancellationToken cancellationToken)
585587
{
588+
self._fetchResponse.ThrowIfDisposed();
586589
byte[] data = await self.GetResponseData(cancellationToken).ConfigureAwait(true);
587590
await stream.WriteAsync(data, cancellationToken).ConfigureAwait(true);
588591
}
@@ -621,13 +624,14 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
621624
ArgumentNullException.ThrowIfNull(buffer, nameof(buffer));
622625
_fetchResponse.ThrowIfDisposed();
623626
#if FEATURE_WASM_THREADS
624-
return await _fetchResponse.FetchResponse!.SynchronizationContext.Send(() => Impl(this, buffer, cancellationToken)).ConfigureAwait(true);
627+
return await _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this, buffer, cancellationToken)).ConfigureAwait(true);
625628
#else
626629
return await Impl(this, buffer, cancellationToken).ConfigureAwait(true);
627630
#endif
628631

629632
static async Task<int> Impl(WasmHttpReadStream self, Memory<byte> buffer, CancellationToken cancellationToken)
630633
{
634+
self._fetchResponse.ThrowIfDisposed();
631635
Task<int> promise;
632636
using (Buffers.MemoryHandle handle = buffer.Pin())
633637
{

src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ internal Task ConnectAsync(Uri uri, List<string>? requestedSubProtocols, Cancell
120120
throw new InvalidOperationException(SR.net_WebSockets_AlreadyStarted);
121121
}
122122
#if FEATURE_WASM_THREADS
123-
JSHost.CurrentOrMainJSSynchronizationContext!.Send(_ =>
123+
JSHost.CurrentOrMainJSSynchronizationContext.Send(_ =>
124124
{
125125
lock (_thisLock)
126126
{
@@ -130,7 +130,7 @@ internal Task ConnectAsync(Uri uri, List<string>? requestedSubProtocols, Cancell
130130
}
131131
}, null);
132132

133-
return JSHost.CurrentOrMainJSSynchronizationContext.Send(() =>
133+
return JSHost.CurrentOrMainJSSynchronizationContext.Post(() =>
134134
{
135135
return ConnectAsyncCore(cancellationToken);
136136
});
@@ -167,7 +167,7 @@ public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType m
167167
WebSocketValidate.ValidateArraySegment(buffer, nameof(buffer));
168168

169169
#if FEATURE_WASM_THREADS
170-
return _innerWebSocket!.SynchronizationContext.Send(() =>
170+
return _innerWebSocket!.SynchronizationContext.Post(() =>
171171
{
172172
Task promise;
173173
lock (_thisLock)
@@ -200,7 +200,7 @@ public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buf
200200
WebSocketValidate.ValidateArraySegment(buffer, nameof(buffer));
201201

202202
#if FEATURE_WASM_THREADS
203-
return _innerWebSocket!.SynchronizationContext.Send(() =>
203+
return _innerWebSocket!.SynchronizationContext.Post(() =>
204204
{
205205
Task<WebSocketReceiveResult> promise;
206206
lock (_thisLock)
@@ -228,7 +228,7 @@ public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string?
228228
}
229229

230230
#if FEATURE_WASM_THREADS
231-
return _innerWebSocket!.SynchronizationContext.Send(() =>
231+
return _innerWebSocket!.SynchronizationContext.Post(() =>
232232
{
233233
Task promise;
234234
lock (_thisLock)
@@ -240,7 +240,7 @@ public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string?
240240
{
241241
throw new WebSocketException(WebSocketError.InvalidState, SR.Format(SR.net_WebSockets_InvalidState, state, "Connecting, Open, CloseSent, Aborted"));
242242
}
243-
if(state != WebSocketState.Open && state != WebSocketState.Connecting && state != WebSocketState.Aborted)
243+
if (state != WebSocketState.Open && state != WebSocketState.Connecting && state != WebSocketState.Aborted)
244244
{
245245
return Task.CompletedTask;
246246
}
@@ -268,7 +268,7 @@ public override Task CloseAsync(WebSocketCloseStatus closeStatus, string? status
268268
}
269269

270270
#if FEATURE_WASM_THREADS
271-
return _innerWebSocket!.SynchronizationContext.Send(() =>
271+
return _innerWebSocket!.SynchronizationContext.Post(() =>
272272
{
273273
Task promise;
274274
lock (_thisLock)
@@ -476,13 +476,17 @@ private async Task SendAsyncCore(ArraySegment<byte> buffer, WebSocketMessageType
476476
try
477477
{
478478
var sendTask = BrowserInterop.UnsafeSendSync(_innerWebSocket!, buffer, messageType, endOfMessage);
479-
if (sendTask == null)
479+
if (sendTask != null)
480480
{
481-
// return synchronously
482-
return;
481+
await CancelationHelper(sendTask, cancellationToken, FastState).ConfigureAwait(true);
483482
}
484-
485-
await CancelationHelper(sendTask, cancellationToken, FastState).ConfigureAwait(true);
483+
#if FEATURE_WASM_THREADS
484+
// return synchronously, not supported with MT
485+
else
486+
{
487+
Environment.FailFast("BrowserWebSocket.SendAsyncCore: Unexpected synchronous result");
488+
}
489+
#endif
486490
}
487491
catch (JSException ex)
488492
{
@@ -502,19 +506,18 @@ private async Task<WebSocketReceiveResult> ReceiveAsyncCore(ArraySegment<byte> b
502506
using (MemoryHandle pinBuffer = bufferMemory.Pin())
503507
{
504508
var receiveTask = BrowserInterop.ReceiveUnsafeSync(_innerWebSocket!, pinBuffer, bufferMemory.Length);
505-
if (receiveTask == null)
509+
if (receiveTask != null)
506510
{
507-
// return synchronously
508-
#if FEATURE_WASM_THREADS
509-
lock (_thisLock)
510-
{
511-
#endif
512-
return ConvertResponse(this);
511+
await CancelationHelper(receiveTask, cancellationToken, FastState).ConfigureAwait(true);
512+
}
513513
#if FEATURE_WASM_THREADS
514-
} //lock
515-
#endif
514+
// return synchronously, not supported with MT
515+
else
516+
{
517+
Environment.FailFast("BrowserWebSocket.ReceiveAsyncCore: Unexpected synchronous result");
516518
}
517-
await CancelationHelper(receiveTask, cancellationToken, FastState).ConfigureAwait(true);
519+
#endif
520+
518521

519522
#if FEATURE_WASM_THREADS
520523
lock (_thisLock)
@@ -555,8 +558,18 @@ private async Task CloseAsyncCore(WebSocketCloseStatus closeStatus, string? stat
555558
_closeStatus = closeStatus;
556559
_closeStatusDescription = statusDescription;
557560

558-
var closeTask = BrowserInterop.WebSocketClose(_innerWebSocket!, (int)closeStatus, statusDescription, waitForCloseReceived) ?? Task.CompletedTask;
559-
await CancelationHelper(closeTask, cancellationToken, FastState).ConfigureAwait(true);
561+
var closeTask = BrowserInterop.WebSocketClose(_innerWebSocket!, (int)closeStatus, statusDescription, waitForCloseReceived);
562+
if (closeTask != null)
563+
{
564+
await CancelationHelper(closeTask, cancellationToken, FastState).ConfigureAwait(true);
565+
}
566+
#if FEATURE_WASM_THREADS
567+
// return synchronously, not supported with MT
568+
else
569+
{
570+
Environment.FailFast("BrowserWebSocket.CloseAsyncCore: Unexpected synchronous result");
571+
}
572+
#endif
560573

561574
#if FEATURE_WASM_THREADS
562575
lock (_thisLock)

src/libraries/System.Net.WebSockets.Client/tests/CloseTest.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ await Assert.ThrowsAnyAsync<WebSocketException>(async () =>
224224

225225
[OuterLoop("Uses external servers", typeof(PlatformDetection), nameof(PlatformDetection.LocalEchoServerIsNotAvailable))]
226226
[ConditionalTheory(nameof(WebSocketsSupported)), MemberData(nameof(EchoServers))]
227-
[ActiveIssue("https://github.com/dotnet/runtime/issues/83517", typeof(PlatformDetection), nameof(PlatformDetection.IsNodeJS))]
227+
[SkipOnPlatform(TestPlatforms.Browser, "This never really worked for browser, it was just lucky timing that browser's `close` event was executed in next browser tick, for this test. See also https://github.com/dotnet/runtime/issues/45538")]
228228
public async Task CloseOutputAsync_ClientInitiated_CanReceive_CanClose(Uri server)
229229
{
230230
string message = "Hello WebSockets!";
@@ -233,8 +233,7 @@ public async Task CloseOutputAsync_ClientInitiated_CanReceive_CanClose(Uri serve
233233
{
234234
var cts = new CancellationTokenSource(TimeOutMilliseconds);
235235

236-
// See issue for Browser websocket differences https://github.com/dotnet/runtime/issues/45538
237-
var closeStatus = PlatformDetection.IsBrowser ? WebSocketCloseStatus.NormalClosure : WebSocketCloseStatus.InvalidPayloadData;
236+
var closeStatus = WebSocketCloseStatus.InvalidPayloadData;
238237
string closeDescription = "CloseOutputAsync_Client_InvalidPayloadData";
239238

240239
await cws.SendAsync(WebSocketData.GetBufferFromText(message), WebSocketMessageType.Text, true, cts.Token);

src/libraries/System.Net.WebSockets.Client/tests/SendReceiveTest.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,11 @@ public async Task ZeroByteReceive_CompletesWhenDataAvailable(Uri server)
514514
// Now do a receive to get the payload.
515515
var receiveBuffer = new byte[1];
516516
t = ReceiveAsync(cws, new ArraySegment<byte>(receiveBuffer), ctsDefault.Token);
517-
Assert.Equal(TaskStatus.RanToCompletion, t.Status);
517+
// this is not synchronously possible when the WS client is on another WebWorker
518+
if(!PlatformDetection.IsWasmThreadingSupported)
519+
{
520+
Assert.Equal(TaskStatus.RanToCompletion, t.Status);
521+
}
518522

519523
r = await t;
520524
Assert.Equal(WebSocketMessageType.Binary, r.MessageType);

src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHost.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ public static Task<JSObject> ImportAsync(string moduleName, string moduleUrl, Ca
5959
return JSHostImplementation.ImportAsync(moduleName, moduleUrl, cancellationToken);
6060
}
6161

62-
public static SynchronizationContext? CurrentOrMainJSSynchronizationContext
62+
public static SynchronizationContext CurrentOrMainJSSynchronizationContext
6363
{
6464
[MethodImpl(MethodImplOptions.AggressiveInlining)]
6565
get
6666
{
6767
#if FEATURE_WASM_THREADS
68-
return JSSynchronizationContext.CurrentJSSynchronizationContext ?? JSSynchronizationContext.MainJSSynchronizationContext ?? null;
68+
return JSSynchronizationContext.CurrentJSSynchronizationContext ?? JSSynchronizationContext.MainJSSynchronizationContext!;
6969
#else
70-
return null;
70+
return null!;
7171
#endif
7272
}
7373
}

0 commit comments

Comments
 (0)