3
3
4
4
using System ;
5
5
using System . Collections . Generic ;
6
+ using System . Collections . Immutable ;
6
7
using System . Linq ;
7
8
using System . Threading . Tasks ;
8
9
@@ -16,14 +17,22 @@ namespace Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client;
16
17
/// </summary>
17
18
internal sealed class ParallelOperationManager < TManager , TEventHandler , TWorkload > : IDisposable
18
19
{
20
+ private const int PreStart = 2 ;
21
+ private readonly static int VSTEST_HOSTPRESTART_COUNT =
22
+ int . TryParse (
23
+ Environment . GetEnvironmentVariable ( nameof ( VSTEST_HOSTPRESTART_COUNT ) ) ,
24
+ out int num )
25
+ ? num
26
+ : PreStart ;
19
27
private readonly Func < TestRuntimeProviderInfo , TManager > _createNewManager ;
20
28
21
29
/// <summary>
22
30
/// Default number of Processes
23
31
/// </summary>
24
32
private TEventHandler ? _eventHandler ;
25
33
private Func < TEventHandler , TManager , TEventHandler > ? _getEventHandler ;
26
- private Action < TManager , TEventHandler , TWorkload > ? _runWorkload ;
34
+ private Func < TManager , TEventHandler , TWorkload , Task > ? _initializeWorkload ;
35
+ private Action < TManager , TEventHandler , TWorkload , bool , Task ? > ? _runWorkload ;
27
36
private bool _acceptMoreWork ;
28
37
private readonly List < ProviderSpecificWorkload < TWorkload > > _workloads = new ( ) ;
29
38
private readonly List < Slot > _managerSlots = new ( ) ;
@@ -33,6 +42,7 @@ internal sealed class ParallelOperationManager<TManager, TEventHandler, TWorkloa
33
42
public int MaxParallelLevel { get ; }
34
43
public int OccupiedSlotCount { get ; private set ; }
35
44
public int AvailableSlotCount { get ; private set ; }
45
+ public int PreStartCount { get ; private set ; }
36
46
37
47
/// <summary>
38
48
/// Creates new instance of ParallelOperationManager.
@@ -44,6 +54,10 @@ public ParallelOperationManager(Func<TestRuntimeProviderInfo, TManager> createNe
44
54
{
45
55
_createNewManager = createNewManager ;
46
56
MaxParallelLevel = parallelLevel ;
57
+ // pre-start only when we don't run in parallel, if we do run in parallel,
58
+ // then pre-starting has no additional value because while one host is starting,
59
+ // another is running tests.
60
+ PreStartCount = MaxParallelLevel == 1 ? VSTEST_HOSTPRESTART_COUNT : 0 ;
47
61
ClearSlots ( acceptMoreWork : true ) ;
48
62
}
49
63
@@ -53,33 +67,32 @@ private void ClearSlots(bool acceptMoreWork)
53
67
{
54
68
_acceptMoreWork = acceptMoreWork ;
55
69
_managerSlots . Clear ( ) ;
56
- _managerSlots . AddRange ( Enumerable . Range ( 0 , MaxParallelLevel ) . Select ( _ => new Slot ( ) ) ) ;
70
+ _managerSlots . AddRange ( Enumerable . Range ( 0 , MaxParallelLevel + PreStartCount ) . Select ( i => new Slot { Index = i } ) ) ;
57
71
SetOccupiedSlotCount ( ) ;
58
72
}
59
73
}
60
74
61
75
private void SetOccupiedSlotCount ( )
62
76
{
63
- AvailableSlotCount = _managerSlots . Count ( s => s . IsAvailable ) ;
77
+ AvailableSlotCount = _managerSlots . Count ( s => ! s . HasWork ) ;
64
78
OccupiedSlotCount = _managerSlots . Count - AvailableSlotCount ;
65
79
}
66
80
67
81
public void StartWork (
68
82
List < ProviderSpecificWorkload < TWorkload > > workloads ,
69
83
TEventHandler eventHandler ,
70
84
Func < TEventHandler , TManager , TEventHandler > getEventHandler ,
71
- Action < TManager , TEventHandler , TWorkload > runWorkload )
85
+ Func < TManager , TEventHandler , TWorkload , Task > initializeWorkload ,
86
+ Action < TManager , TEventHandler , TWorkload , bool , Task ? > runWorkload )
72
87
{
73
88
_ = workloads ?? throw new ArgumentNullException ( nameof ( workloads ) ) ;
74
89
_eventHandler = eventHandler ?? throw new ArgumentNullException ( nameof ( eventHandler ) ) ;
75
90
_getEventHandler = getEventHandler ?? throw new ArgumentNullException ( nameof ( getEventHandler ) ) ;
91
+ _initializeWorkload = initializeWorkload ?? throw new ArgumentNullException ( nameof ( initializeWorkload ) ) ;
76
92
_runWorkload = runWorkload ?? throw new ArgumentNullException ( nameof ( runWorkload ) ) ;
77
93
78
94
_workloads . AddRange ( workloads ) ;
79
95
80
- // This creates as many slots as possible even though we might not use them when we get less workloads to process,
81
- // this is not a big issue, and not worth optimizing, because the parallel level is determined by the logical CPU count,
82
- // so it is a small number.
83
96
ClearSlots ( acceptMoreWork : true ) ;
84
97
RunWorkInParallel ( ) ;
85
98
}
@@ -101,66 +114,115 @@ private bool RunWorkInParallel()
101
114
if ( _runWorkload == null )
102
115
throw new InvalidOperationException ( $ "{ nameof ( _runWorkload ) } was not provided.") ;
103
116
104
- // Reserve slots and assign them work under the lock so we keep
105
- // the slots consistent.
106
- List < SlotWorkloadPair > workToRun = new ( ) ;
117
+ // Reserve slots and assign them work under the lock so we keep the slots consistent.
118
+ Slot [ ] slots ;
107
119
lock ( _lock )
108
120
{
109
- if ( _workloads . Count == 0 )
110
- return false ;
111
-
112
121
// When HandlePartialDiscovery or HandlePartialRun are in progress, and we call StopAllManagers,
113
122
// it is possible that we will clear all slots, and have RunWorkInParallel waiting on the lock,
114
123
// so when it is allowed to enter it will try to add more work, but we already cancelled,
115
124
// so we should not start more work.
116
125
if ( ! _acceptMoreWork )
117
126
return false ;
118
127
119
- var availableSlots = _managerSlots . Where ( slot => slot . IsAvailable ) . ToList ( ) ;
120
- var availableWorkloads = _workloads . Where ( workload => workload != null ) . ToList ( ) ;
121
- var amount = Math . Min ( availableSlots . Count , availableWorkloads . Count ) ;
122
- var workloadsToRun = availableWorkloads . Take ( amount ) . ToList ( ) ;
123
-
128
+ // We grab all empty slots.
129
+ var availableSlots = _managerSlots . Where ( slot => ! slot . HasWork ) . ToImmutableArray ( ) ;
130
+ var occupiedSlots = MaxParallelLevel - ( availableSlots . Length - PreStartCount ) ;
131
+ // We grab all available workloads.
132
+ var availableWorkloads = _workloads . Where ( workload => workload != null ) . ToImmutableArray ( ) ;
133
+ // We take the amount of workloads to fill all the slots, or just as many workloads
134
+ // as there are if there are less workloads than slots.
135
+ var amount = Math . Min ( availableSlots . Length , availableWorkloads . Length ) ;
136
+ var workloadsToAdd = availableWorkloads . Take ( amount ) . ToImmutableArray ( ) ;
137
+
138
+ // We associate each workload to a slot, if we reached the max parallel
139
+ // level, then we will run only initalize step of the given workload.
124
140
for ( int i = 0 ; i < amount ; i ++ )
125
141
{
126
142
var slot = availableSlots [ i ] ;
127
- slot . IsAvailable = false ;
128
- var workload = workloadsToRun [ i ] ;
129
- workToRun . Add ( new SlotWorkloadPair ( slot , workload ) ) ;
143
+ slot . HasWork = true ;
144
+ var workload = workloadsToAdd [ i ] ;
145
+ slot . ShouldPreStart = occupiedSlots + i + 1 > MaxParallelLevel ;
146
+
147
+ var manager = _createNewManager ( workload . Provider ) ;
148
+ var eventHandler = _getEventHandler ( _eventHandler , manager ) ;
149
+ slot . EventHandler = eventHandler ;
150
+ slot . Manager = manager ;
151
+ slot . ManagerInfo = workload . Provider ;
152
+ slot . Work = workload . Work ;
153
+
130
154
_workloads . Remove ( workload ) ;
131
155
}
132
156
157
+ slots = _managerSlots . ToArray ( ) ;
133
158
SetOccupiedSlotCount ( ) ;
134
-
135
- foreach ( var pair in workToRun )
136
- {
137
- var manager = _createNewManager ( pair . Workload . Provider ) ;
138
- var eventHandler = _getEventHandler ( _eventHandler , manager ) ;
139
- pair . Slot . EventHandler = eventHandler ;
140
- pair . Slot . Manager = manager ;
141
- pair . Slot . ManagerInfo = pair . Workload . Provider ;
142
- pair . Slot . Work = pair . Workload . Work ;
143
- }
144
159
}
145
160
146
161
// Kick of the work in parallel outside of the lock so if we have more requests to run
147
162
// that come in at the same time we only block them from reserving the same slot at the same time
148
163
// but not from starting their assigned work at the same time.
149
- foreach ( var pair in workToRun )
164
+
165
+ // Kick of all pre-started hosts from the ones that had the longest time to initialize.
166
+ //
167
+ // This code should be safe even outside the lock since HasWork is only changed when work is
168
+ // complete and only for the slot that completed work. It is not possible to complete work before
169
+ // starting it (which is what we are trying to do here).
170
+ var startedWork = 0 ;
171
+ foreach ( var slot in slots . Where ( s => s . HasWork && ! s . IsRunning && s . IsPreStarted ) . OrderBy ( s => s . PreStartTime ) )
172
+ {
173
+ startedWork ++ ;
174
+ slot . IsRunning = true ;
175
+ EqtTrace . Verbose ( $ "ParallelOperationManager.RunWorkInParallel: Running on pre-started host: { ( DateTime . Now . TimeOfDay - slot . PreStartTime ) . TotalMilliseconds } ms { slot . InitTask ? . Status } ") ;
176
+ _runWorkload ( slot . Manager ! , slot . EventHandler ! , slot . Work ! , slot . IsPreStarted , slot . InitTask ) ;
177
+
178
+ // We already started as many as we were allowed, jump out;
179
+ if ( startedWork == MaxParallelLevel )
180
+ {
181
+ break ;
182
+ }
183
+ }
184
+
185
+ // We already started as many pre-started testhosts as we are allowed by the max parallel level
186
+ // skip running more work.
187
+ if ( startedWork < MaxParallelLevel )
150
188
{
151
- try
189
+ foreach ( var slot in slots )
152
190
{
153
- _runWorkload ( pair . Slot . Manager ! , pair . Slot . EventHandler ! , pair . Workload . Work ! ) ;
191
+ if ( slot . HasWork && ! slot . IsRunning )
192
+ {
193
+ if ( ! slot . ShouldPreStart )
194
+ {
195
+ startedWork ++ ;
196
+ slot . IsRunning = true ;
197
+ EqtTrace . Verbose ( "ParallelOperationManager.RunWorkInParallel: Started work on a host." ) ;
198
+ _runWorkload ( slot . Manager ! , slot . EventHandler ! , slot . Work ! , slot . IsPreStarted , slot . InitTask ) ;
199
+ }
200
+ }
201
+
202
+ // We already started as many as we were allowed, jump out;
203
+ if ( startedWork == MaxParallelLevel )
204
+ {
205
+ break ;
206
+ }
154
207
}
155
- finally
208
+ }
209
+
210
+ var preStartedWork = 0 ;
211
+ foreach ( var slot in slots )
212
+ {
213
+ if ( slot . HasWork && slot . ShouldPreStart && ! slot . IsPreStarted )
156
214
{
157
- // clean the slot or something, to make sure we don't keep it reserved.
215
+ preStartedWork ++ ;
216
+ slot . PreStartTime = DateTime . Now . TimeOfDay ;
217
+ slot . IsPreStarted = true ;
218
+ EqtTrace . Verbose ( "ParallelOperationManager.RunWorkInParallel: Pre-starting a host." ) ;
219
+ slot . InitTask = _initializeWorkload ! ( slot . Manager ! , slot . EventHandler ! , slot . Work ! ) ;
158
220
}
159
221
}
160
222
161
223
// Return true when we started more work. Or false, when there was nothing more to do.
162
224
// This will propagate to handling of partial discovery or partial run.
163
- return workToRun . Count > 0 ;
225
+ return preStartedWork + startedWork > 0 ;
164
226
}
165
227
166
228
public bool RunNextWork ( TManager completedManager )
@@ -174,12 +236,12 @@ private void ClearCompletedSlot(TManager completedManager)
174
236
{
175
237
lock ( _lock )
176
238
{
177
- var completedSlot = _managerSlots . Where ( s => ReferenceEquals ( completedManager , s . Manager ) ) . ToList ( ) ;
239
+ var completedSlot = _managerSlots . Where ( s => ReferenceEquals ( completedManager , s . Manager ) ) . ToImmutableArray ( ) ;
178
240
// When HandlePartialDiscovery or HandlePartialRun are in progress, and we call StopAllManagers,
179
241
// it is possible that we will clear all slots, while ClearCompletedSlot is waiting on the lock,
180
242
// so when it is allowed to enter it will fail to find the respective slot and fail. In this case it is
181
243
// okay that the slot is not found, and we do nothing, because we already stopped all work and cleared the slots.
182
- if ( completedSlot . Count == 0 )
244
+ if ( completedSlot . Length == 0 )
183
245
{
184
246
if ( _acceptMoreWork )
185
247
{
@@ -191,13 +253,21 @@ private void ClearCompletedSlot(TManager completedManager)
191
253
}
192
254
}
193
255
194
- if ( completedSlot . Count > 1 )
256
+ if ( completedSlot . Length > 1 )
195
257
{
196
258
throw new InvalidOperationException ( "The provided manager was found in multiple slots." ) ;
197
259
}
198
260
199
261
var slot = completedSlot [ 0 ] ;
200
- slot . IsAvailable = true ;
262
+ slot . PreStartTime = TimeSpan . Zero ;
263
+ slot . Work = default ( TWorkload ) ;
264
+ slot . HasWork = false ;
265
+ slot . ShouldPreStart = false ;
266
+ slot . IsPreStarted = false ;
267
+ slot . InitTask = null ;
268
+ slot . IsRunning = false ;
269
+ slot . Manager = default ( TManager ) ;
270
+ slot . EventHandler = default ( TEventHandler ) ;
201
271
202
272
SetOccupiedSlotCount ( ) ;
203
273
}
@@ -207,9 +277,9 @@ public void DoActionOnAllManagers(Action<TManager> action, bool doActionsInParal
207
277
{
208
278
// We don't need to lock here, we just grab the current list of
209
279
// slots that are occupied (have managers) and run action on each one of them.
210
- var managers = _managerSlots . Where ( slot => ! slot . IsAvailable ) . Select ( slot => slot . Manager ) . ToList ( ) ;
280
+ var managers = _managerSlots . Where ( slot => slot . HasWork ) . Select ( slot => slot . Manager ) . ToImmutableArray ( ) ;
211
281
int i = 0 ;
212
- var actionTasks = new Task [ managers . Count ] ;
282
+ var actionTasks = new Task [ managers . Length ] ;
213
283
foreach ( var manager in managers )
214
284
{
215
285
if ( manager == null )
@@ -260,7 +330,14 @@ public void Dispose()
260
330
261
331
private class Slot
262
332
{
263
- public bool IsAvailable { get ; set ; } = true ;
333
+ public int Index { get ; set ; }
334
+ public bool HasWork { get ; set ; }
335
+
336
+ public bool ShouldPreStart { get ; set ; }
337
+
338
+ public Task ? InitTask { get ; set ; }
339
+
340
+ public bool IsRunning { get ; set ; }
264
341
265
342
public TManager ? Manager { get ; set ; }
266
343
@@ -269,16 +346,12 @@ private class Slot
269
346
public TEventHandler ? EventHandler { get ; set ; }
270
347
271
348
public TWorkload ? Work { get ; set ; }
272
- }
349
+ public bool IsPreStarted { get ; internal set ; }
350
+ public TimeSpan PreStartTime { get ; internal set ; }
273
351
274
- private class SlotWorkloadPair
275
- {
276
- public SlotWorkloadPair ( Slot slot , ProviderSpecificWorkload < TWorkload > workload )
352
+ public override string ToString ( )
277
353
{
278
- Slot = slot ;
279
- Workload = workload ;
354
+ return $ "{ Index } : HasWork: { HasWork } , ShouldPreStart: { ShouldPreStart } , IsPreStarted: { IsPreStarted } , IsRunning: { IsRunning } ";
280
355
}
281
- public Slot Slot { get ; }
282
- public ProviderSpecificWorkload < TWorkload > Workload { get ; }
283
356
}
284
357
}
0 commit comments