Skip to content

Commit a4e2609

Browse files
authored
Split PortableThreadPool.WorkerThread start and loop body (#84490)
This is Part 1 of #84489 - landing support for async JS interop on threadpool threads in multi-threaded WebAssembly. We will need to start the threadpool worker threads on the browser in a special way, such that they can exit back to the JS event loop, and use callbacks to run the worker loop body. The current PR splits into a separate file the logic for starting threadpool worker threads, and the outer loop that waits for the semaphore that signals that work is available for the worker. The loop body (to be shared with the callback-based approach in a future PR) remains in PortableThreadPool.WorkerThread.cs as several new toplevel functions. Current PR is just refactoring existing code. No functional change. * Split PortableThreadPool.WorkerThread start and loop body For browser-wasm we will need to start the worker thread in a special way, and use callbacks to run the loop body. Current PR is just refactoring existing code. No functional change. * Change loop to use return instead of break * rename utility method to ShouldExitWorker
1 parent 790b14b commit a4e2609

File tree

3 files changed

+170
-133
lines changed

3 files changed

+170
-133
lines changed

src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2527,6 +2527,7 @@
25272527
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.ThreadCounts.cs" />
25282528
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WaitThread.cs" />
25292529
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.cs" />
2530+
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.NonBrowser.cs"/>
25302531
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerTracking.cs" />
25312532
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Unix.cs" Condition="'$(TargetsUnix)' == 'true' or '$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true'" />
25322533
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Diagnostics.Tracing;
5+
6+
namespace System.Threading
7+
{
8+
internal sealed partial class PortableThreadPool
9+
{
10+
/// <summary>
11+
/// The worker thread infastructure for the CLR thread pool.
12+
/// </summary>
13+
private static partial class WorkerThread
14+
{
15+
16+
/// <summary>
17+
/// Semaphore for controlling how many threads are currently working.
18+
/// </summary>
19+
private static readonly LowLevelLifoSemaphore s_semaphore =
20+
new LowLevelLifoSemaphore(
21+
0,
22+
MaxPossibleThreadCount,
23+
AppContextConfigHelper.GetInt32Config(
24+
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
25+
SemaphoreSpinCountDefault,
26+
false),
27+
onWait: () =>
28+
{
29+
if (NativeRuntimeEventSource.Log.IsEnabled())
30+
{
31+
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
32+
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
33+
}
34+
});
35+
36+
private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;
37+
38+
private static void WorkerThreadStart()
39+
{
40+
Thread.CurrentThread.SetThreadPoolWorkerThreadName();
41+
42+
PortableThreadPool threadPoolInstance = ThreadPoolInstance;
43+
44+
if (NativeRuntimeEventSource.Log.IsEnabled())
45+
{
46+
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
47+
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
48+
}
49+
50+
LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
51+
LowLevelLifoSemaphore semaphore = s_semaphore;
52+
53+
while (true)
54+
{
55+
bool spinWait = true;
56+
while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait))
57+
{
58+
WorkerDoWork(threadPoolInstance, ref spinWait);
59+
}
60+
61+
if (ShouldExitWorker(threadPoolInstance, threadAdjustmentLock))
62+
{
63+
break;
64+
}
65+
}
66+
}
67+
68+
69+
private static void CreateWorkerThread()
70+
{
71+
// Thread pool threads must start in the default execution context without transferring the context, so
72+
// using UnsafeStart() instead of Start()
73+
Thread workerThread = new Thread(s_workerThreadStart);
74+
workerThread.IsThreadPoolThread = true;
75+
workerThread.IsBackground = true;
76+
// thread name will be set in thread proc
77+
workerThread.UnsafeStart();
78+
}
79+
}
80+
}
81+
}

src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs

Lines changed: 88 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System.Diagnostics.Tracing;
5+
using System.Runtime.CompilerServices;
56

67
namespace System.Threading
78
{
@@ -28,147 +29,112 @@ private static partial class WorkerThread
2829
// preexisting threads from running out of memory when using new stack space in low-memory situations.
2930
public const int EstimatedAdditionalStackUsagePerThreadBytes = 64 << 10; // 64 KB
3031

31-
/// <summary>
32-
/// Semaphore for controlling how many threads are currently working.
33-
/// </summary>
34-
private static readonly LowLevelLifoSemaphore s_semaphore =
35-
new LowLevelLifoSemaphore(
36-
0,
37-
MaxPossibleThreadCount,
38-
AppContextConfigHelper.GetInt32Config(
39-
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
40-
SemaphoreSpinCountDefault,
41-
false),
42-
onWait: () =>
32+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
33+
private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait)
34+
{
35+
bool alreadyRemovedWorkingWorker = false;
36+
while (TakeActiveRequest(threadPoolInstance))
37+
{
38+
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
39+
if (!ThreadPoolWorkQueue.Dispatch())
4340
{
44-
if (NativeRuntimeEventSource.Log.IsEnabled())
45-
{
46-
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
47-
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
48-
}
49-
});
41+
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
42+
// already removed this working worker in the counts. This typically happens when hill climbing
43+
// decreases the worker thread count goal.
44+
alreadyRemovedWorkingWorker = true;
45+
break;
46+
}
5047

51-
private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;
48+
if (threadPoolInstance._separated.numRequestedWorkers <= 0)
49+
{
50+
break;
51+
}
5252

53-
private static void WorkerThreadStart()
54-
{
55-
Thread.CurrentThread.SetThreadPoolWorkerThreadName();
53+
// In highly bursty cases with short bursts of work, especially in the portable thread pool
54+
// implementation, worker threads are being released and entering Dispatch very quickly, not finding
55+
// much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on
56+
// data and some interlocked operations, and similarly when the thread pool runs out of work. Since
57+
// there is a pending request for work, introduce a slight delay before serving the next request.
58+
// The spin-wait is mainly for when the sleep is not effective due to there being no other threads
59+
// to schedule.
60+
Thread.UninterruptibleSleep0();
61+
if (!Environment.IsSingleProcessor)
62+
{
63+
Thread.SpinWait(1);
64+
}
65+
}
5666

57-
PortableThreadPool threadPoolInstance = ThreadPoolInstance;
67+
// Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work,
68+
// as it's unlikely that the worker thread count goal would be increased again so soon afterwards that
69+
// the semaphore would be released within the spin-wait window
70+
spinWait = !alreadyRemovedWorkingWorker;
5871

59-
if (NativeRuntimeEventSource.Log.IsEnabled())
72+
if (!alreadyRemovedWorkingWorker)
6073
{
61-
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
62-
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
74+
// If we woke up but couldn't find a request, or ran out of work items to process, we need to update
75+
// the number of working workers to reflect that we are done working for now
76+
RemoveWorkingWorker(threadPoolInstance);
6377
}
78+
}
6479

65-
LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
66-
LowLevelLifoSemaphore semaphore = s_semaphore;
80+
// returns true if the worker is shutting down
81+
// returns false if we should do another iteration
82+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
83+
private static bool ShouldExitWorker (PortableThreadPool threadPoolInstance, LowLevelLock threadAdjustmentLock)
84+
{
85+
// The thread cannot exit if it has IO pending, otherwise the IO may be canceled
86+
if (IsIOPending)
87+
{
88+
return false;
89+
}
6790

68-
while (true)
91+
threadAdjustmentLock.Acquire();
92+
try
6993
{
70-
bool spinWait = true;
71-
while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait))
94+
// At this point, the thread's wait timed out. We are shutting down this thread.
95+
// We are going to decrement the number of existing threads to no longer include this one
96+
// and then change the max number of threads in the thread pool to reflect that we don't need as many
97+
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
98+
ThreadCounts counts = threadPoolInstance._separated.counts;
99+
while (true)
72100
{
73-
bool alreadyRemovedWorkingWorker = false;
74-
while (TakeActiveRequest(threadPoolInstance))
75-
{
76-
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
77-
if (!ThreadPoolWorkQueue.Dispatch())
78-
{
79-
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
80-
// already removed this working worker in the counts. This typically happens when hill climbing
81-
// decreases the worker thread count goal.
82-
alreadyRemovedWorkingWorker = true;
83-
break;
84-
}
85-
86-
if (threadPoolInstance._separated.numRequestedWorkers <= 0)
87-
{
88-
break;
89-
}
90-
91-
// In highly bursty cases with short bursts of work, especially in the portable thread pool
92-
// implementation, worker threads are being released and entering Dispatch very quickly, not finding
93-
// much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on
94-
// data and some interlocked operations, and similarly when the thread pool runs out of work. Since
95-
// there is a pending request for work, introduce a slight delay before serving the next request.
96-
// The spin-wait is mainly for when the sleep is not effective due to there being no other threads
97-
// to schedule.
98-
Thread.UninterruptibleSleep0();
99-
if (!Environment.IsSingleProcessor)
100-
{
101-
Thread.SpinWait(1);
102-
}
103-
}
104-
105-
// Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work,
106-
// as it's unlikely that the worker thread count goal would be increased again so soon afterwards that
107-
// the semaphore would be released within the spin-wait window
108-
spinWait = !alreadyRemovedWorkingWorker;
109-
110-
if (!alreadyRemovedWorkingWorker)
101+
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
102+
// this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not
103+
// decreased below NumProcessingWork, as that would be indicative of such a case.
104+
if (counts.NumExistingThreads <= counts.NumProcessingWork)
111105
{
112-
// If we woke up but couldn't find a request, or ran out of work items to process, we need to update
113-
// the number of working workers to reflect that we are done working for now
114-
RemoveWorkingWorker(threadPoolInstance);
106+
// In this case, enough work came in that this thread should not time out and should go back to work.
107+
return false;
115108
}
116-
}
117-
118-
// The thread cannot exit if it has IO pending, otherwise the IO may be canceled
119-
if (IsIOPending)
120-
{
121-
continue;
122-
}
123109

124-
threadAdjustmentLock.Acquire();
125-
try
126-
{
127-
// At this point, the thread's wait timed out. We are shutting down this thread.
128-
// We are going to decrement the number of existing threads to no longer include this one
129-
// and then change the max number of threads in the thread pool to reflect that we don't need as many
130-
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
131-
ThreadCounts counts = threadPoolInstance._separated.counts;
132-
while (true)
110+
ThreadCounts newCounts = counts;
111+
short newNumExistingThreads = --newCounts.NumExistingThreads;
112+
short newNumThreadsGoal =
113+
Math.Max(
114+
threadPoolInstance.MinThreadsGoal,
115+
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
116+
newCounts.NumThreadsGoal = newNumThreadsGoal;
117+
118+
ThreadCounts oldCounts =
119+
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
120+
if (oldCounts == counts)
133121
{
134-
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
135-
// this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not
136-
// decreased below NumProcessingWork, as that would be indicative of such a case.
137-
if (counts.NumExistingThreads <= counts.NumProcessingWork)
122+
HillClimbing.ThreadPoolHillClimber.ForceChange(
123+
newNumThreadsGoal,
124+
HillClimbing.StateOrTransition.ThreadTimedOut);
125+
if (NativeRuntimeEventSource.Log.IsEnabled())
138126
{
139-
// In this case, enough work came in that this thread should not time out and should go back to work.
140-
break;
127+
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
141128
}
142-
143-
ThreadCounts newCounts = counts;
144-
short newNumExistingThreads = --newCounts.NumExistingThreads;
145-
short newNumThreadsGoal =
146-
Math.Max(
147-
threadPoolInstance.MinThreadsGoal,
148-
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
149-
newCounts.NumThreadsGoal = newNumThreadsGoal;
150-
151-
ThreadCounts oldCounts =
152-
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
153-
if (oldCounts == counts)
154-
{
155-
HillClimbing.ThreadPoolHillClimber.ForceChange(
156-
newNumThreadsGoal,
157-
HillClimbing.StateOrTransition.ThreadTimedOut);
158-
if (NativeRuntimeEventSource.Log.IsEnabled())
159-
{
160-
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
161-
}
162-
return;
163-
}
164-
165-
counts = oldCounts;
129+
return true;
166130
}
131+
132+
counts = oldCounts;
167133
}
168-
finally
169-
{
170-
threadAdjustmentLock.Release();
171-
}
134+
}
135+
finally
136+
{
137+
threadAdjustmentLock.Release();
172138
}
173139
}
174140

@@ -300,17 +266,6 @@ private static bool TakeActiveRequest(PortableThreadPool threadPoolInstance)
300266
}
301267
return false;
302268
}
303-
304-
private static void CreateWorkerThread()
305-
{
306-
// Thread pool threads must start in the default execution context without transferring the context, so
307-
// using UnsafeStart() instead of Start()
308-
Thread workerThread = new Thread(s_workerThreadStart);
309-
workerThread.IsThreadPoolThread = true;
310-
workerThread.IsBackground = true;
311-
// thread name will be set in thread proc
312-
workerThread.UnsafeStart();
313-
}
314269
}
315270
}
316271
}

0 commit comments

Comments
 (0)