Skip to content

Commit 7ddc45a

Browse files
authored
[wasm-mt] Support async JS interop on threadpool threads (#84494)
This is the last part of #84489 - initial runtime support for async JS interop on threadpool threads in multi-threaded WebAssembly. Conceptually there are a few things here: * A mechanism for the native runtime to start some threads as "exitable" so that we don't clean up when they return from their thread start function. (we will clean up when the pthreads TLS dtor runs) * A change to make JSHostImplementation.s_csOwnedObjects a thread-static in multithreaded builds. This is because the map keys on a small integer handle that is allocated per-thread on the JS side as a handle for JS objects that are to be kept alive because managed code depends on them. (This is needed in general, but also makes the new smoke test work) * A version of the PortableThreadPool.WorkerThread that starts as an exitable thread and uses asynchronous callbacks to wait for available threadpool work items and returns to the JS event loop periodically in order to allow JS promises to settle. * A smoke test that does a background JS fetch and some async callbacks on it. --- * [wasm-mt] Enable JSInterop on threadpool workers Implement PortableThreadPool loop using semaphore callbacks manage emscripten event loop from PortableThreadPool.WorkerThread make sure to keep the thread alive after setting up the semaphore wait. Cleanup the thread when exiting minimal sample - fetch on a background thread works Add WebWorkerEventLoop internal class to managed event loop keepalive Start threadpool threads with keepalive checks Add a flag to mono's thread start wrappers to keep track of threads that may not want cleanup to run after the Start function returns. Use the flag when starting threadpool threads. make minimal FetchBackground sample more like a unit test Set WorkerThread.IsIOPending when the current thread has unsettled JS interop promises. When IsIOPending is true, the worker will not exit even if it has no more work to do. Instead it will repeatedly wait for more work to arrive or for all promises to settle. change minimal sample's fetch helper to artificially delay the delay is longer that the threadpool worker's semaphore timeout, in order to validate that the worker stays alive while there are unsettled promises * [wasm-mt] Add background interop to smoketest * update to use the LowLevelLifoAsyncWaitSemaphore * adjust to renamed PortableThreadPool helper methods * adjust to renamed WebWorkerEventLoop.HasJavaScriptInteropDependents * extend and rationalize the smoke test a bit Add a test that just starts a thread and asserts that it has a different thread ID than the main thread. This should allow us to rule out accidentally having the test pass on a single-threaded runtime * hide some debug output * smoke test: dispose of the ImportAsync result after the task is done * [wasm-mt] make JSHostImplementation.s_csOwnedObjects ThreadStatic The integer jsHandles are not global - they are essentially indices into a JS array. So the mapping from a jsHandle to a JSObject must be per-thread. This fixes the thread affinity assertions in the smoketest (which were false positives - we looked up a worker's jsHandle and got back the main thread's JSObject - and then asserted that it was accessed from the wrong thread) * remove locking on JSHostImplementation.CsOwnedObjects In single-threaded wasm, there is no need to lock since there is only one caller at a time. In multi-threaded wasm, the underlying dictionary is thread-static * [threads] make the "external eventloop" platform independent It only does something on WASM, but in principle if other platforms allow us to run some code after returning from a thread start function, we could do it there, too. * Add a Thread.HasExternalEventLoop managed property Set it from WebWorkerEventLoop.StartExitable. In native code, use it to set the `MONO_THREAD_CREATE_FLAGS_EXTERNAL_EVENTLOOP` flag when starting the thread. * rename JSHostImplementation.ThreadCsOwnedObjects (used to be CsOwnedObjects) Rename to make it clear that it's objects owned by the current thread, not the runtime globally * [checked] assert GC Safe mode, when returning to external eventloop
1 parent 4719575 commit 7ddc45a

File tree

25 files changed

+414
-52
lines changed

25 files changed

+414
-52
lines changed

src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2530,9 +2530,9 @@
25302530
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.ThreadCounts.cs" />
25312531
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WaitThread.cs" />
25322532
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.cs" />
2533-
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.NonBrowser.cs" />
2533+
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.NonBrowser.cs" Condition="'$(TargetsBrowser)' != 'true'"/>
25342534
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerTracking.cs" />
2535-
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Unix.cs" Condition="'$(TargetsUnix)' == 'true' or '$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true'" />
2535+
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Unix.cs" Condition="'$(TargetsUnix)' == 'true' or ('$(TargetsBrowser)' == 'true' and '$(FeatureWasmThreads)' != 'true') or '$(TargetsWasi)' == 'true'" />
25362536
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
25372537
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\LowLevelLifoSemaphore.cs" />
25382538
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\LowLevelLifoSemaphore.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />

src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ public static partial class ThreadPool
1919
{
2020
// Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that
2121
// the runtime may use the thread for processing other work
22+
#if !(TARGET_BROWSER && FEATURE_WASM_THREADS)
2223
internal static bool YieldFromDispatchLoop => false;
24+
#endif
2325

2426
#if NATIVEAOT
2527
private const bool IsWorkerTrackingEnabledInConfig = false;

src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/Interop/LegacyExports.cs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,15 @@ internal static void PreventTrimming()
3232

3333
public static void GetCSOwnedObjectByJSHandleRef(nint jsHandle, int shouldAddInflight, out JSObject? result)
3434
{
35-
lock (JSHostImplementation.s_csOwnedObjects)
35+
if (JSHostImplementation.ThreadCsOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference))
3636
{
37-
if (JSHostImplementation.s_csOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference))
37+
reference.TryGetTarget(out JSObject? jsObject);
38+
if (shouldAddInflight != 0)
3839
{
39-
reference.TryGetTarget(out JSObject? jsObject);
40-
if (shouldAddInflight != 0)
41-
{
42-
jsObject?.AddInFlight();
43-
}
44-
result = jsObject;
45-
return;
40+
jsObject?.AddInFlight();
4641
}
42+
result = jsObject;
43+
return;
4744
}
4845
result = null;
4946
}
@@ -77,14 +74,12 @@ public static void CreateCSOwnedProxyRef(nint jsHandle, LegacyHostImplementation
7774

7875
JSObject? res = null;
7976

80-
lock (JSHostImplementation.s_csOwnedObjects)
77+
if (!JSHostImplementation.ThreadCsOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference) ||
78+
!reference.TryGetTarget(out res) ||
79+
res.IsDisposed)
8180
{
82-
if (!JSHostImplementation.s_csOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference) ||
83-
!reference.TryGetTarget(out res) ||
84-
res.IsDisposed)
85-
{
8681
#pragma warning disable CS0612 // Type or member is obsolete
87-
res = mappedType switch
82+
res = mappedType switch
8883
{
8984
LegacyHostImplementation.MappedType.JSObject => new JSObject(jsHandle),
9085
LegacyHostImplementation.MappedType.Array => new Array(jsHandle),
@@ -95,8 +90,7 @@ public static void CreateCSOwnedProxyRef(nint jsHandle, LegacyHostImplementation
9590
_ => throw new ArgumentOutOfRangeException(nameof(mappedType))
9691
};
9792
#pragma warning restore CS0612 // Type or member is obsolete
98-
JSHostImplementation.s_csOwnedObjects[(int)jsHandle] = new WeakReference<JSObject>(res, trackResurrection: true);
99-
}
93+
JSHostImplementation.ThreadCsOwnedObjects[(int)jsHandle] = new WeakReference<JSObject>(res, trackResurrection: true);
10094
}
10195
if (shouldAddInflight != 0)
10296
{

src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,20 @@ internal static partial class JSHostImplementation
1515
private const string TaskGetResultName = "get_Result";
1616
private static MethodInfo? s_taskGetResultMethodInfo;
1717
// we use this to maintain identity of JSHandle for a JSObject proxy
18-
public static readonly Dictionary<int, WeakReference<JSObject>> s_csOwnedObjects = new Dictionary<int, WeakReference<JSObject>>();
18+
#if FEATURE_WASM_THREADS
19+
[ThreadStatic]
20+
#endif
21+
private static Dictionary<int, WeakReference<JSObject>>? s_csOwnedObjects;
22+
23+
public static Dictionary<int, WeakReference<JSObject>> ThreadCsOwnedObjects
24+
{
25+
get
26+
{
27+
s_csOwnedObjects ??= new ();
28+
return s_csOwnedObjects;
29+
}
30+
}
31+
1932
// we use this to maintain identity of GCHandle for a managed object
2033
public static Dictionary<object, IntPtr> s_gcHandleFromJSOwnedObject = new Dictionary<object, IntPtr>(ReferenceEqualityComparer.Instance);
2134

@@ -24,10 +37,7 @@ public static void ReleaseCSOwnedObject(nint jsHandle)
2437
{
2538
if (jsHandle != IntPtr.Zero)
2639
{
27-
lock (s_csOwnedObjects)
28-
{
29-
s_csOwnedObjects.Remove((int)jsHandle);
30-
}
40+
ThreadCsOwnedObjects.Remove((int)jsHandle);
3141
Interop.Runtime.ReleaseCSOwnedObject(jsHandle);
3242
}
3343
}
@@ -175,17 +185,14 @@ public static unsafe void FreeMethodSignatureBuffer(JSFunctionBinding signature)
175185

176186
public static JSObject CreateCSOwnedProxy(nint jsHandle)
177187
{
178-
JSObject? res = null;
188+
JSObject? res;
179189

180-
lock (s_csOwnedObjects)
190+
if (!ThreadCsOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference) ||
191+
!reference.TryGetTarget(out res) ||
192+
res.IsDisposed)
181193
{
182-
if (!s_csOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference) ||
183-
!reference.TryGetTarget(out res) ||
184-
res.IsDisposed)
185-
{
186-
res = new JSObject(jsHandle);
187-
s_csOwnedObjects[(int)jsHandle] = new WeakReference<JSObject>(res, trackResurrection: true);
188-
}
194+
res = new JSObject(jsHandle);
195+
ThreadCsOwnedObjects[(int)jsHandle] = new WeakReference<JSObject>(res, trackResurrection: true);
189196
}
190197
return res;
191198
}

src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/Legacy/LegacyHostImplementation.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@ public static void ReleaseInFlight(object obj)
2121
[MethodImpl(MethodImplOptions.AggressiveInlining)]
2222
public static void RegisterCSOwnedObject(JSObject proxy)
2323
{
24-
lock (JSHostImplementation.s_csOwnedObjects)
25-
{
26-
JSHostImplementation.s_csOwnedObjects[(int)proxy.JSHandle] = new WeakReference<JSObject>(proxy, trackResurrection: true);
27-
}
24+
JSHostImplementation.ThreadCsOwnedObjects[(int)proxy.JSHandle] = new WeakReference<JSObject>(proxy, trackResurrection: true);
2825
}
2926

3027
public static MarshalType GetMarshalTypeFromType(Type type)

src/mono/System.Private.CoreLib/System.Private.CoreLib.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@
281281
<ItemGroup Condition="('$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true') and '$(FeatureWasmThreads)' == 'true'">
282282
<Compile Include="$(BclSourcesRoot)\System\Threading\ThreadPoolBoundHandle.Browser.Threads.Mono.cs" />
283283
<Compile Include="$(BclSourcesRoot)\System\Threading\LowLevelLifoAsyncWaitSemaphore.Browser.Threads.Mono.cs" />
284+
<Compile Include="$(BclSourcesRoot)\System\Threading\PortableThreadPool.Browser.Threads.Mono.cs" />
285+
<Compile Include="$(BclSourcesRoot)\System\Threading\PortableThreadPool.WorkerThread.Browser.Threads.Mono.cs" />
286+
<Compile Include="$(BclSourcesRoot)\System\Threading\ThreadPool.Browser.Threads.Mono.cs" />
284287
<Compile Include="$(BclSourcesRoot)\System\Threading\WebWorkerEventLoop.Browser.Threads.Mono.cs" />
285288
</ItemGroup>
286289
<ItemGroup Condition="('$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true') and '$(FeatureWasmThreads)' != 'true'">
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
namespace System.Threading;
5+
6+
internal sealed partial class PortableThreadPool
7+
{
8+
private static partial class WorkerThread
9+
{
10+
private static bool IsIOPending => WebWorkerEventLoop.HasJavaScriptInteropDependents;
11+
}
12+
13+
private struct CpuUtilizationReader
14+
{
15+
#pragma warning disable CA1822
16+
public double CurrentUtilization => 0.0; // FIXME: can we do better
17+
#pragma warning restore CA1822
18+
}
19+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Diagnostics;
5+
using System.Diagnostics.CodeAnalysis;
6+
using System.Diagnostics.Tracing;
7+
using System.Runtime.CompilerServices;
8+
9+
namespace System.Threading
10+
{
11+
internal sealed partial class PortableThreadPool
12+
{
13+
/// <summary>
14+
/// The worker thread infastructure for the CLR thread pool.
15+
/// </summary>
16+
private static partial class WorkerThread
17+
{
18+
/// <summary>
19+
/// Semaphore for controlling how many threads are currently working.
20+
/// </summary>
21+
private static readonly LowLevelLifoAsyncWaitSemaphore s_semaphore =
22+
new LowLevelLifoAsyncWaitSemaphore(
23+
0,
24+
MaxPossibleThreadCount,
25+
AppContextConfigHelper.GetInt32Config(
26+
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
27+
SemaphoreSpinCountDefault,
28+
false),
29+
onWait: () =>
30+
{
31+
if (NativeRuntimeEventSource.Log.IsEnabled())
32+
{
33+
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
34+
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
35+
}
36+
});
37+
38+
private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;
39+
40+
private sealed record SemaphoreWaitState(PortableThreadPool ThreadPoolInstance, LowLevelLock ThreadAdjustmentLock, WebWorkerEventLoop.KeepaliveToken KeepaliveToken)
41+
{
42+
public bool SpinWait = true;
43+
44+
public void ResetIteration() {
45+
SpinWait = true;
46+
}
47+
}
48+
49+
private static void WorkerThreadStart()
50+
{
51+
Thread.CurrentThread.SetThreadPoolWorkerThreadName();
52+
53+
PortableThreadPool threadPoolInstance = ThreadPoolInstance;
54+
55+
if (NativeRuntimeEventSource.Log.IsEnabled())
56+
{
57+
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
58+
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
59+
}
60+
61+
LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
62+
var keepaliveToken = WebWorkerEventLoop.KeepalivePush();
63+
SemaphoreWaitState state = new(threadPoolInstance, threadAdjustmentLock, keepaliveToken) { SpinWait = true };
64+
// set up the callbacks for semaphore waits, tell
65+
// emscripten to keep the thread alive, and return to
66+
// the JS event loop.
67+
WaitForWorkLoop(s_semaphore, state);
68+
// return from thread start with keepalive - the thread will stay alive in the JS event loop
69+
}
70+
71+
private static readonly Action<LowLevelLifoAsyncWaitSemaphore, object?> s_WorkLoopSemaphoreSuccess = new(WorkLoopSemaphoreSuccess);
72+
private static readonly Action<LowLevelLifoAsyncWaitSemaphore, object?> s_WorkLoopSemaphoreTimedOut = new(WorkLoopSemaphoreTimedOut);
73+
74+
private static void WaitForWorkLoop(LowLevelLifoAsyncWaitSemaphore semaphore, SemaphoreWaitState state)
75+
{
76+
semaphore.PrepareAsyncWait(ThreadPoolThreadTimeoutMs, s_WorkLoopSemaphoreSuccess, s_WorkLoopSemaphoreTimedOut, state);
77+
// thread should still be kept alive
78+
Debug.Assert(state.KeepaliveToken.Valid);
79+
}
80+
81+
private static void WorkLoopSemaphoreSuccess(LowLevelLifoAsyncWaitSemaphore semaphore, object? stateObject)
82+
{
83+
SemaphoreWaitState state = (SemaphoreWaitState)stateObject!;
84+
WorkerDoWork(state.ThreadPoolInstance, ref state.SpinWait);
85+
// Go around the loop one more time, keeping existing mutated state
86+
WaitForWorkLoop(semaphore, state);
87+
}
88+
89+
private static void WorkLoopSemaphoreTimedOut(LowLevelLifoAsyncWaitSemaphore semaphore, object? stateObject)
90+
{
91+
SemaphoreWaitState state = (SemaphoreWaitState)stateObject!;
92+
if (ShouldExitWorker(state.ThreadPoolInstance, state.ThreadAdjustmentLock)) {
93+
// we're done, kill the thread.
94+
95+
// we're wrapped in an emscripten eventloop handler which will consult the
96+
// keepalive count, destroy the thread and run the TLS dtor which will
97+
// unregister the thread from Mono
98+
state.KeepaliveToken.Pop();
99+
return;
100+
} else {
101+
// more work showed up while we were shutting down, go around one more time
102+
state.ResetIteration();
103+
WaitForWorkLoop(semaphore, state);
104+
}
105+
}
106+
107+
private static void CreateWorkerThread()
108+
{
109+
// Thread pool threads must start in the default execution context without transferring the context, so
110+
// using captureContext: false.
111+
Thread workerThread = new Thread(s_workerThreadStart);
112+
workerThread.IsThreadPoolThread = true;
113+
workerThread.IsBackground = true;
114+
// thread name will be set in thread proc
115+
116+
// This thread will return to the JS event loop - tell the runtime not to cleanup
117+
// after the start function returns, if the Emscripten keepalive is non-zero.
118+
WebWorkerEventLoop.StartExitable(workerThread, captureContext: false);
119+
}
120+
}
121+
}
122+
}

src/mono/System.Private.CoreLib/src/System/Threading/Thread.Mono.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public partial class Thread
3737
private int interruption_requested;
3838
private IntPtr longlived;
3939
internal bool threadpool_thread;
40+
internal bool external_eventloop; // browser-wasm: thread will return to the JS eventloop
4041
/* These are used from managed code */
4142
internal byte apartment_state;
4243
internal int managed_id;
@@ -352,5 +353,17 @@ private static void SpinWait_nop()
352353
private static extern void SetPriority(Thread thread, int priority);
353354

354355
internal int GetSmallId() => small_id;
356+
357+
internal bool HasExternalEventLoop
358+
{
359+
get
360+
{
361+
return external_eventloop;
362+
}
363+
set
364+
{
365+
external_eventloop = value;
366+
}
367+
}
355368
}
356369
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
namespace System.Threading
5+
{
6+
public static partial class ThreadPool
7+
{
8+
// Indicates that the threadpool should yield the thread from the dispatch loop to the
9+
// runtime periodically. We use this to return back to the JS event loop so that the JS
10+
// event queue can be drained
11+
internal static bool YieldFromDispatchLoop => true;
12+
}
13+
}

0 commit comments

Comments
 (0)