Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 124 additions & 114 deletions src/Aspire.Hosting/Pipelines/DistributedApplicationPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,37 +115,8 @@ public async Task ExecuteAsync(DeployingContext context)

var stepsByName = allSteps.ToDictionary(s => s.Name);

var levels = ResolveDependencies(allSteps, stepsByName);

foreach (var level in levels)
{
var tasks = level.Select(step => ExecuteStepAsync(step, context)).ToList();
try
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
catch
{
// Collect all exceptions from failed tasks
var exceptions = tasks
.Where(t => t.IsFaulted)
.SelectMany(t => t.Exception?.InnerExceptions ?? Enumerable.Empty<Exception>())
.ToList();

if (exceptions.Count == 1)
{
ExceptionDispatchInfo.Capture(exceptions[0]).Throw();
}
else if (exceptions.Count > 1)
{
throw new AggregateException(
$"Multiple pipeline steps failed at the same level: {string.Join(", ", exceptions.OfType<InvalidOperationException>().Select(e => e.Message))}",
exceptions);
}

throw;
}
}
// Build dependency graph and execute with readiness-based scheduler
await ExecuteStepsAsTaskDag(allSteps, stepsByName, context).ConfigureAwait(false);
}

private static IEnumerable<PipelineStep> CollectStepsFromAnnotations(DeployingContext context)
Expand Down Expand Up @@ -201,126 +172,165 @@ private static void ValidateSteps(IEnumerable<PipelineStep> steps)
}

/// <summary>
/// Resolves the dependencies among the steps and organizes them into levels for execution.
/// Executes pipeline steps by building a Task DAG where each step waits on its dependencies.
/// Uses CancellationToken to stop remaining work when any step fails.
/// </summary>
/// <param name="steps">The complete set of pipeline steps populated from annotations and the builder</param>
/// <param name="stepsByName">A dictionary mapping step names to their corresponding step objects</param>
/// <returns>A list of lists where each list contains the steps to be executed at the same level</returns>
private static List<List<PipelineStep>> ResolveDependencies(
IEnumerable<PipelineStep> steps,
Dictionary<string, PipelineStep> stepsByName)
private static async Task ExecuteStepsAsTaskDag(
List<PipelineStep> steps,
Dictionary<string, PipelineStep> stepsByName,
DeployingContext context)
{
// Initial a graph that represents a step and its dependencies
// and an inDegree map to count the number of dependencies that
// each step has.
var graph = new Dictionary<string, List<string>>();
var inDegree = new Dictionary<string, int>();
// Validate no cycles exist in the dependency graph
ValidateDependencyGraph(steps, stepsByName);

// Create a TaskCompletionSource for each step
var stepCompletions = new Dictionary<string, TaskCompletionSource>(steps.Count);
foreach (var step in steps)
{
graph[step.Name] = [];
inDegree[step.Name] = 0;
stepCompletions[step.Name] = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
}

// Process all the `RequiredBy` relationships in the graph and adds
// the each `RequiredBy` step to the DependsOn list of the step that requires it.
foreach (var step in steps)
// Execute a step after its dependencies complete
async Task ExecuteStepWithDependencies(PipelineStep step)
{
foreach (var requiredByStep in step.RequiredBySteps)
var stepTcs = stepCompletions[step.Name];

// Wait for all dependencies to complete (will throw if any dependency failed)
if (step.DependsOnSteps.Count > 0)
{
if (!graph.ContainsKey(requiredByStep))
try
{
throw new InvalidOperationException(
$"Step '{step.Name}' is required by unknown step '{requiredByStep}'");
var depTasks = step.DependsOnSteps.Select(depName => stepCompletions[depName].Task);
await Task.WhenAll(depTasks).ConfigureAwait(false);
}

if (stepsByName.TryGetValue(requiredByStep, out var requiredByStepObj) &&
!requiredByStepObj.DependsOnSteps.Contains(step.Name))
catch (Exception ex)
{
requiredByStepObj.DependsOnSteps.Add(step.Name);
// Dependency failed - mark this step as failed and stop, but don't re-throw
// to avoid counting the same root cause exception multiple times
stepTcs.TrySetException(ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe figure out what dependency step failed, get its name, and wrap exception? It would be a lot clearer to see the name of the dependency that failed that caused this step to fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhanced error message to identify which dependency failed. The exception now includes: "Step 'X' cannot run because dependency 'Y' failed" with the wrapped original exception. Changes in commit 9b8c1f2.

return;
}
}

try
{
await ExecuteStepAsync(step, context).ConfigureAwait(false);

stepTcs.TrySetResult();
}
catch (Exception ex)
{
// Execution failure - mark as failed and re-throw so it's counted
stepTcs.TrySetException(ex);
throw;
}
}

// Start all steps (they'll wait on their dependencies internally)
var allStepTasks = new Task[steps.Count];
for (var i = 0; i < steps.Count; i++)
{
var step = steps[i];
allStepTasks[i] = Task.Run(() => ExecuteStepWithDependencies(step));
}

// Now that the `DependsOn` lists are fully populated, we can build the graph
// and the inDegree map based only on the DependOnSteps list.
// Wait for all steps to complete (or fail)
try
{
await Task.WhenAll(allStepTasks).ConfigureAwait(false);
}
catch
{
// Collect all failed steps
var failures = allStepTasks
.Where(t => t.IsFaulted)
.Select(t => t.Exception!)
.SelectMany(ae => ae.InnerExceptions)
.ToList();

if (failures.Count > 1)
{
throw new AggregateException("Multiple pipeline steps failed.", failures);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include the names of failing steps in the exception message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added step names extraction from exceptions. Aggregate exception message now includes: "Multiple pipeline steps failed: step1, step2, step3". Changes in commit 9b8c1f2.

}
Comment on lines 260 to 277
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aggregate exception message lost prior specificity (previous implementation listed individual step failure messages); consider including step names/messages for easier diagnosis.

Copilot uses AI. Check for mistakes.

// Single failure - just rethrow
throw;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same theme: Wrap exception with an exception with message that includes name?

}
}

/// <summary>
/// Validates that the pipeline steps form a directed acyclic graph (DAG) with no circular dependencies.
/// </summary>
/// <remarks>
/// Uses depth-first search (DFS) to detect cycles. A cycle exists if we encounter a node that is
/// currently being visited (on the current DFS path).
///
/// Example: A → B → C is valid (no cycle)
/// Example: A → B → C → A is invalid (cycle detected)
/// Example: A → B, A → C, B → D, C → D is valid (diamond dependency, no cycle)
/// </remarks>
private static void ValidateDependencyGraph(
List<PipelineStep> steps,
Dictionary<string, PipelineStep> stepsByName)
{
// Process all RequiredBy relationships and normalize to DependsOn
foreach (var step in steps)
{
foreach (var dependency in step.DependsOnSteps)
foreach (var requiredByStep in step.RequiredBySteps)
{
if (!graph.TryGetValue(dependency, out var dependents))
if (!stepsByName.TryGetValue(requiredByStep, out var requiredByStepObj))
{
throw new InvalidOperationException(
$"Step '{step.Name}' depends on unknown step '{dependency}'");
$"Step '{step.Name}' is required by unknown step '{requiredByStep}'");
}

dependents.Add(step.Name);
inDegree[step.Name]++;
requiredByStepObj.DependsOnSteps.Add(step.Name);
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding requiredBy dependencies without checking for existing entries can introduce duplicate dependency names; guard with if (!requiredByStepObj.DependsOnSteps.Contains(step.Name)) before Add.

Suggested change
requiredByStepObj.DependsOnSteps.Add(step.Name);
if (!requiredByStepObj.DependsOnSteps.Contains(step.Name))
{
requiredByStepObj.DependsOnSteps.Add(step.Name);
}

Copilot uses AI. Check for mistakes.
}
}

// Perform a topological sort to determine the levels of execution and
// initialize a queue with all steps that have no dependencies (inDegree of 0)
// and can be executed immediately as part of the first level.
var levels = new List<List<PipelineStep>>();
var queue = new Queue<string>(
inDegree.Where(kvp => kvp.Value == 0).Select(kvp => kvp.Key)
);

// Process the queue until all steps have been organized into levels.
// We start with the steps that have no dependencies and then iterate
// through all the steps that depend on them to build out the graph
// until no more steps are available to process.
while (queue.Count > 0)
// Track visit state: 0 = unvisited, 1 = visiting (on current path), 2 = visited
var visitState = new Dictionary<string, int>(steps.Count);

// DFS to detect cycles
void DetectCycles(string stepName, List<string> path)
{
var currentLevel = new List<PipelineStep>();
var levelSize = queue.Count;
if (visitState.TryGetValue(stepName, out var state))
{
if (state == 1) // Currently visiting - cycle detected!
{
path.Add(stepName);
throw new InvalidOperationException(
$"Circular dependency detected in pipeline steps: {string.Join(" → ", path)}");
}
if (state == 2) // Already visited - no need to check again
{
return;
}
}

visitState[stepName] = 1; // Mark as visiting
path.Add(stepName);

for (var i = 0; i < levelSize; i++)
if (stepsByName.TryGetValue(stepName, out var step))
{
var stepName = queue.Dequeue();
var step = stepsByName[stepName];
currentLevel.Add(step);

// For each dependent step, reduce its inDegree by 1
// in each iteration since its dependencies have been
// processed. Once a dependent step has an inDegree
// of 0, it means all its dependencies have been
// processed and it can be added to the queue so we
// can process the next level of dependencies.
foreach (var dependent in graph[stepName])
foreach (var dependency in step.DependsOnSteps)
{
inDegree[dependent]--;
if (inDegree[dependent] == 0)
{
queue.Enqueue(dependent);
}
DetectCycles(dependency, path);
}
}

// Exhausting the queue means that we've resolved all
// steps that can run in parallel.
levels.Add(currentLevel);
path.RemoveAt(path.Count - 1);
visitState[stepName] = 2; // Mark as visited
}

// If the total number of steps in all levels does not equal
// the total number of steps in the pipeline, it indicates that
// there is a circular dependency in the graph. Steps are enqueued
// for processing into levels above when all their dependencies are
// resolved. When a cycle exists, the degrees of the steps in the cycle
// will never reach zero and won't be enqueued for processing so the
// total number of processed steps will be less than the total number
// of steps in the pipeline.
if (levels.Sum(l => l.Count) != steps.Count())
// Check each step for cycles
foreach (var step in steps)
{
var processedSteps = new HashSet<string>(levels.SelectMany(l => l.Select(s => s.Name)));
var stepsInCycle = steps.Where(s => !processedSteps.Contains(s.Name)).Select(s => s.Name).ToList();

throw new InvalidOperationException(
$"Circular dependency detected in pipeline steps: {string.Join(", ", stepsInCycle)}");
if (!visitState.ContainsKey(step.Name))
{
DetectCycles(step.Name, []);
}
}

return levels;
}

private static async Task ExecuteStepAsync(PipelineStep step, DeployingContext context)
Expand Down
Loading
Loading