Skip to content

Commit 5a193ff

Browse files
CopilotdavidfowlJamesNK
authored
Improve pipeline concurrency with readiness-based scheduler (#12059)
* Initial plan * Implement readiness-based scheduler for pipeline execution Co-authored-by: davidfowl <[email protected]> * Add tests demonstrating improved pipeline concurrency Co-authored-by: davidfowl <[email protected]> * Refactor pipeline execution to use a channel-based readiness scheduler with automatic concurrency management * Optimize dependency management and cycle detection in pipeline execution * Enhance execution logic in pipeline to improve exception tracking and completion handling * Enhance dependency graph construction with detailed remarks and cycle detection using Kahn's algorithm * Fix thread safety in pipeline step execution by adding locks around executed steps collection * test too flaky * Refactor pipeline execution to use Task DAG for dependency management and cycle detection * Improve dependency handling in ExecuteStepWithDependencies to ensure proper exception tracking * Refactor cycle detection in ExecuteStepWithDependencies to use a stack and hash set for improved clarity and performance * Address code review feedback: add StringComparer.Ordinal, improve error messages, and use enum for cycle detection state Co-authored-by: JamesNK <[email protected]> * Improve error messages: list all failed dependencies and match failures to steps directly Co-authored-by: JamesNK <[email protected]> * Add unit tests for failure scenarios: circular dependencies, step execution failures, and dependency failures Co-authored-by: JamesNK <[email protected]> --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: davidfowl <[email protected]> Co-authored-by: David Fowler <[email protected]> Co-authored-by: JamesNK <[email protected]>
1 parent f47ce2e commit 5a193ff

File tree

2 files changed

+456
-166
lines changed

2 files changed

+456
-166
lines changed

src/Aspire.Hosting/Pipelines/DistributedApplicationPipeline.cs

Lines changed: 173 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -113,39 +113,10 @@ public async Task ExecuteAsync(DeployingContext context)
113113

114114
ValidateSteps(allSteps);
115115

116-
var stepsByName = allSteps.ToDictionary(s => s.Name);
116+
var stepsByName = allSteps.ToDictionary(s => s.Name, StringComparer.Ordinal);
117117

118-
var levels = ResolveDependencies(allSteps, stepsByName);
119-
120-
foreach (var level in levels)
121-
{
122-
var tasks = level.Select(step => ExecuteStepAsync(step, context)).ToList();
123-
try
124-
{
125-
await Task.WhenAll(tasks).ConfigureAwait(false);
126-
}
127-
catch
128-
{
129-
// Collect all exceptions from failed tasks
130-
var exceptions = tasks
131-
.Where(t => t.IsFaulted)
132-
.SelectMany(t => t.Exception?.InnerExceptions ?? Enumerable.Empty<Exception>())
133-
.ToList();
134-
135-
if (exceptions.Count == 1)
136-
{
137-
ExceptionDispatchInfo.Capture(exceptions[0]).Throw();
138-
}
139-
else if (exceptions.Count > 1)
140-
{
141-
throw new AggregateException(
142-
$"Multiple pipeline steps failed at the same level: {string.Join(", ", exceptions.OfType<InvalidOperationException>().Select(e => e.Message))}",
143-
exceptions);
144-
}
145-
146-
throw;
147-
}
148-
}
118+
// Build dependency graph and execute with readiness-based scheduler
119+
await ExecuteStepsAsTaskDag(allSteps, stepsByName, context).ConfigureAwait(false);
149120
}
150121

151122
private static IEnumerable<PipelineStep> CollectStepsFromAnnotations(DeployingContext context)
@@ -167,7 +138,7 @@ private static IEnumerable<PipelineStep> CollectStepsFromAnnotations(DeployingCo
167138

168139
private static void ValidateSteps(IEnumerable<PipelineStep> steps)
169140
{
170-
var stepNames = new HashSet<string>();
141+
var stepNames = new HashSet<string>(StringComparer.Ordinal);
171142

172143
foreach (var step in steps)
173144
{
@@ -201,126 +172,213 @@ private static void ValidateSteps(IEnumerable<PipelineStep> steps)
201172
}
202173

203174
/// <summary>
204-
/// Resolves the dependencies among the steps and organizes them into levels for execution.
175+
/// Executes pipeline steps by building a Task DAG where each step waits on its dependencies.
176+
/// Uses CancellationToken to stop remaining work when any step fails.
205177
/// </summary>
206-
/// <param name="steps">The complete set of pipeline steps populated from annotations and the builder</param>
207-
/// <param name="stepsByName">A dictionary mapping step names to their corresponding step objects</param>
208-
/// <returns>A list of lists where each list contains the steps to be executed at the same level</returns>
209-
private static List<List<PipelineStep>> ResolveDependencies(
210-
IEnumerable<PipelineStep> steps,
211-
Dictionary<string, PipelineStep> stepsByName)
178+
private static async Task ExecuteStepsAsTaskDag(
179+
List<PipelineStep> steps,
180+
Dictionary<string, PipelineStep> stepsByName,
181+
DeployingContext context)
212182
{
213-
// Initial a graph that represents a step and its dependencies
214-
// and an inDegree map to count the number of dependencies that
215-
// each step has.
216-
var graph = new Dictionary<string, List<string>>();
217-
var inDegree = new Dictionary<string, int>();
183+
// Validate no cycles exist in the dependency graph
184+
ValidateDependencyGraph(steps, stepsByName);
218185

186+
// Create a TaskCompletionSource for each step
187+
var stepCompletions = new Dictionary<string, TaskCompletionSource>(steps.Count, StringComparer.Ordinal);
219188
foreach (var step in steps)
220189
{
221-
graph[step.Name] = [];
222-
inDegree[step.Name] = 0;
190+
stepCompletions[step.Name] = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
223191
}
224192

225-
// Process all the `RequiredBy` relationships in the graph and adds
226-
// the each `RequiredBy` step to the DependsOn list of the step that requires it.
227-
foreach (var step in steps)
193+
// Execute a step after its dependencies complete
194+
async Task ExecuteStepWithDependencies(PipelineStep step)
228195
{
229-
foreach (var requiredByStep in step.RequiredBySteps)
196+
var stepTcs = stepCompletions[step.Name];
197+
198+
// Wait for all dependencies to complete (will throw if any dependency failed)
199+
if (step.DependsOnSteps.Count > 0)
230200
{
231-
if (!graph.ContainsKey(requiredByStep))
201+
try
232202
{
233-
throw new InvalidOperationException(
234-
$"Step '{step.Name}' is required by unknown step '{requiredByStep}'");
203+
var depTasks = step.DependsOnSteps.Select(depName => stepCompletions[depName].Task);
204+
await Task.WhenAll(depTasks).ConfigureAwait(false);
205+
}
206+
catch (Exception ex)
207+
{
208+
// Find all dependencies that failed
209+
var failedDeps = step.DependsOnSteps
210+
.Where(depName => stepCompletions[depName].Task.IsFaulted)
211+
.ToList();
212+
213+
var message = failedDeps.Count > 0
214+
? $"Step '{step.Name}' cannot run because {(failedDeps.Count == 1 ? "dependency" : "dependencies")} {string.Join(", ", failedDeps.Select(d => $"'{d}'"))} failed"
215+
: $"Step '{step.Name}' cannot run because a dependency failed";
216+
217+
// Wrap the dependency failure with context about this step
218+
var wrappedException = new InvalidOperationException(message, ex);
219+
stepTcs.TrySetException(wrappedException);
220+
return;
235221
}
222+
}
223+
224+
try
225+
{
226+
await ExecuteStepAsync(step, context).ConfigureAwait(false);
227+
228+
stepTcs.TrySetResult();
229+
}
230+
catch (Exception ex)
231+
{
232+
// Execution failure - mark as failed and re-throw so it's counted
233+
stepTcs.TrySetException(ex);
234+
throw;
235+
}
236+
}
237+
238+
// Start all steps (they'll wait on their dependencies internally)
239+
var allStepTasks = new Task[steps.Count];
240+
for (var i = 0; i < steps.Count; i++)
241+
{
242+
var step = steps[i];
243+
allStepTasks[i] = Task.Run(() => ExecuteStepWithDependencies(step));
244+
}
236245

237-
if (stepsByName.TryGetValue(requiredByStep, out var requiredByStepObj) &&
238-
!requiredByStepObj.DependsOnSteps.Contains(step.Name))
246+
// Wait for all steps to complete (or fail)
247+
try
248+
{
249+
await Task.WhenAll(allStepTasks).ConfigureAwait(false);
250+
}
251+
catch
252+
{
253+
// Collect all failed steps and their names
254+
var failures = allStepTasks
255+
.Where(t => t.IsFaulted)
256+
.Select(t => t.Exception!)
257+
.SelectMany(ae => ae.InnerExceptions)
258+
.ToList();
259+
260+
if (failures.Count > 1)
261+
{
262+
// Match failures to steps to get their names
263+
var failedStepNames = new List<string>();
264+
for (var i = 0; i < allStepTasks.Length; i++)
239265
{
240-
requiredByStepObj.DependsOnSteps.Add(step.Name);
266+
if (allStepTasks[i].IsFaulted)
267+
{
268+
failedStepNames.Add(steps[i].Name);
269+
}
241270
}
271+
272+
var message = failedStepNames.Count > 0
273+
? $"Multiple pipeline steps failed: {string.Join(", ", failedStepNames.Distinct())}"
274+
: "Multiple pipeline steps failed.";
275+
276+
throw new AggregateException(message, failures);
242277
}
278+
279+
// Single failure - just rethrow
280+
throw;
243281
}
282+
}
283+
284+
/// <summary>
285+
/// Represents the visitation state of a step during cycle detection.
286+
/// </summary>
287+
private enum VisitState
288+
{
289+
/// <summary>
290+
/// The step has not been visited yet.
291+
/// </summary>
292+
Unvisited,
293+
294+
/// <summary>
295+
/// The step is currently being visited (on the current DFS path).
296+
/// </summary>
297+
Visiting,
298+
299+
/// <summary>
300+
/// The step has been fully visited (all descendants explored).
301+
/// </summary>
302+
Visited
303+
}
244304

245-
// Now that the `DependsOn` lists are fully populated, we can build the graph
246-
// and the inDegree map based only on the DependOnSteps list.
305+
/// <summary>
306+
/// Validates that the pipeline steps form a directed acyclic graph (DAG) with no circular dependencies.
307+
/// </summary>
308+
/// <remarks>
309+
/// Uses depth-first search (DFS) to detect cycles. A cycle exists if we encounter a node that is
310+
/// currently being visited (in the Visiting state), meaning we've found a back edge in the graph.
311+
///
312+
/// Example: A → B → C is valid (no cycle)
313+
/// Example: A → B → C → A is invalid (cycle detected)
314+
/// Example: A → B, A → C, B → D, C → D is valid (diamond dependency, no cycle)
315+
/// </remarks>
316+
private static void ValidateDependencyGraph(
317+
List<PipelineStep> steps,
318+
Dictionary<string, PipelineStep> stepsByName)
319+
{
320+
// Process all RequiredBy relationships and normalize to DependsOn
247321
foreach (var step in steps)
248322
{
249-
foreach (var dependency in step.DependsOnSteps)
323+
foreach (var requiredByStep in step.RequiredBySteps)
250324
{
251-
if (!graph.TryGetValue(dependency, out var dependents))
325+
if (!stepsByName.TryGetValue(requiredByStep, out var requiredByStepObj))
252326
{
253327
throw new InvalidOperationException(
254-
$"Step '{step.Name}' depends on unknown step '{dependency}'");
328+
$"Step '{step.Name}' is required by unknown step '{requiredByStep}'");
255329
}
256330

257-
dependents.Add(step.Name);
258-
inDegree[step.Name]++;
331+
requiredByStepObj.DependsOnSteps.Add(step.Name);
259332
}
260333
}
261334

262-
// Perform a topological sort to determine the levels of execution and
263-
// initialize a queue with all steps that have no dependencies (inDegree of 0)
264-
// and can be executed immediately as part of the first level.
265-
var levels = new List<List<PipelineStep>>();
266-
var queue = new Queue<string>(
267-
inDegree.Where(kvp => kvp.Value == 0).Select(kvp => kvp.Key)
268-
);
269-
270-
// Process the queue until all steps have been organized into levels.
271-
// We start with the steps that have no dependencies and then iterate
272-
// through all the steps that depend on them to build out the graph
273-
// until no more steps are available to process.
274-
while (queue.Count > 0)
335+
var visitStates = new Dictionary<string, VisitState>(steps.Count, StringComparer.Ordinal);
336+
foreach (var step in steps)
275337
{
276-
var currentLevel = new List<PipelineStep>();
277-
var levelSize = queue.Count;
338+
visitStates[step.Name] = VisitState.Unvisited;
339+
}
340+
341+
// DFS to detect cycles
342+
void DetectCycles(string stepName, Stack<string> path)
343+
{
344+
var state = visitStates[stepName];
345+
346+
if (state == VisitState.Visiting) // Currently visiting - cycle detected!
347+
{
348+
var cycle = path.Reverse().SkipWhile(s => s != stepName).Append(stepName);
349+
throw new InvalidOperationException(
350+
$"Circular dependency detected in pipeline steps: {string.Join(" → ", cycle)}");
351+
}
352+
353+
if (state == VisitState.Visited) // Already fully visited - no need to check again
354+
{
355+
return;
356+
}
357+
358+
visitStates[stepName] = VisitState.Visiting;
359+
path.Push(stepName);
278360

279-
for (var i = 0; i < levelSize; i++)
361+
if (stepsByName.TryGetValue(stepName, out var step))
280362
{
281-
var stepName = queue.Dequeue();
282-
var step = stepsByName[stepName];
283-
currentLevel.Add(step);
284-
285-
// For each dependent step, reduce its inDegree by 1
286-
// in each iteration since its dependencies have been
287-
// processed. Once a dependent step has an inDegree
288-
// of 0, it means all its dependencies have been
289-
// processed and it can be added to the queue so we
290-
// can process the next level of dependencies.
291-
foreach (var dependent in graph[stepName])
363+
foreach (var dependency in step.DependsOnSteps)
292364
{
293-
inDegree[dependent]--;
294-
if (inDegree[dependent] == 0)
295-
{
296-
queue.Enqueue(dependent);
297-
}
365+
DetectCycles(dependency, path);
298366
}
299367
}
300368

301-
// Exhausting the queue means that we've resolved all
302-
// steps that can run in parallel.
303-
levels.Add(currentLevel);
369+
path.Pop();
370+
visitStates[stepName] = VisitState.Visited;
304371
}
305372

306-
// If the total number of steps in all levels does not equal
307-
// the total number of steps in the pipeline, it indicates that
308-
// there is a circular dependency in the graph. Steps are enqueued
309-
// for processing into levels above when all their dependencies are
310-
// resolved. When a cycle exists, the degrees of the steps in the cycle
311-
// will never reach zero and won't be enqueued for processing so the
312-
// total number of processed steps will be less than the total number
313-
// of steps in the pipeline.
314-
if (levels.Sum(l => l.Count) != steps.Count())
373+
// Check each step for cycles
374+
var path = new Stack<string>();
375+
foreach (var step in steps)
315376
{
316-
var processedSteps = new HashSet<string>(levels.SelectMany(l => l.Select(s => s.Name)));
317-
var stepsInCycle = steps.Where(s => !processedSteps.Contains(s.Name)).Select(s => s.Name).ToList();
318-
319-
throw new InvalidOperationException(
320-
$"Circular dependency detected in pipeline steps: {string.Join(", ", stepsInCycle)}");
377+
if (visitStates[step.Name] == VisitState.Unvisited)
378+
{
379+
DetectCycles(step.Name, path);
380+
}
321381
}
322-
323-
return levels;
324382
}
325383

326384
private static async Task ExecuteStepAsync(PipelineStep step, DeployingContext context)

0 commit comments

Comments
 (0)