Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 173 additions & 115 deletions src/Aspire.Hosting/Pipelines/DistributedApplicationPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,39 +113,10 @@ public async Task ExecuteAsync(DeployingContext context)

ValidateSteps(allSteps);

var stepsByName = allSteps.ToDictionary(s => s.Name);
var stepsByName = allSteps.ToDictionary(s => s.Name, StringComparer.Ordinal);

var levels = ResolveDependencies(allSteps, stepsByName);

foreach (var level in levels)
{
var tasks = level.Select(step => ExecuteStepAsync(step, context)).ToList();
try
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
catch
{
// Collect all exceptions from failed tasks
var exceptions = tasks
.Where(t => t.IsFaulted)
.SelectMany(t => t.Exception?.InnerExceptions ?? Enumerable.Empty<Exception>())
.ToList();

if (exceptions.Count == 1)
{
ExceptionDispatchInfo.Capture(exceptions[0]).Throw();
}
else if (exceptions.Count > 1)
{
throw new AggregateException(
$"Multiple pipeline steps failed at the same level: {string.Join(", ", exceptions.OfType<InvalidOperationException>().Select(e => e.Message))}",
exceptions);
}

throw;
}
}
// Build dependency graph and execute with readiness-based scheduler
await ExecuteStepsAsTaskDag(allSteps, stepsByName, context).ConfigureAwait(false);
}

private static IEnumerable<PipelineStep> CollectStepsFromAnnotations(DeployingContext context)
Expand All @@ -167,7 +138,7 @@ private static IEnumerable<PipelineStep> CollectStepsFromAnnotations(DeployingCo

private static void ValidateSteps(IEnumerable<PipelineStep> steps)
{
var stepNames = new HashSet<string>();
var stepNames = new HashSet<string>(StringComparer.Ordinal);

foreach (var step in steps)
{
Expand Down Expand Up @@ -201,126 +172,213 @@ private static void ValidateSteps(IEnumerable<PipelineStep> steps)
}

/// <summary>
/// Resolves the dependencies among the steps and organizes them into levels for execution.
/// Executes pipeline steps by building a Task DAG where each step waits on its dependencies.
/// Uses CancellationToken to stop remaining work when any step fails.
/// </summary>
/// <param name="steps">The complete set of pipeline steps populated from annotations and the builder</param>
/// <param name="stepsByName">A dictionary mapping step names to their corresponding step objects</param>
/// <returns>A list of lists where each list contains the steps to be executed at the same level</returns>
private static List<List<PipelineStep>> ResolveDependencies(
IEnumerable<PipelineStep> steps,
Dictionary<string, PipelineStep> stepsByName)
private static async Task ExecuteStepsAsTaskDag(
List<PipelineStep> steps,
Dictionary<string, PipelineStep> stepsByName,
DeployingContext context)
{
// Initial a graph that represents a step and its dependencies
// and an inDegree map to count the number of dependencies that
// each step has.
var graph = new Dictionary<string, List<string>>();
var inDegree = new Dictionary<string, int>();
// Validate no cycles exist in the dependency graph
ValidateDependencyGraph(steps, stepsByName);

// Create a TaskCompletionSource for each step
var stepCompletions = new Dictionary<string, TaskCompletionSource>(steps.Count, StringComparer.Ordinal);
foreach (var step in steps)
{
graph[step.Name] = [];
inDegree[step.Name] = 0;
stepCompletions[step.Name] = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
}

// Process all the `RequiredBy` relationships in the graph and adds
// the each `RequiredBy` step to the DependsOn list of the step that requires it.
foreach (var step in steps)
// Execute a step after its dependencies complete
async Task ExecuteStepWithDependencies(PipelineStep step)
{
foreach (var requiredByStep in step.RequiredBySteps)
var stepTcs = stepCompletions[step.Name];

// Wait for all dependencies to complete (will throw if any dependency failed)
if (step.DependsOnSteps.Count > 0)
{
if (!graph.ContainsKey(requiredByStep))
try
{
throw new InvalidOperationException(
$"Step '{step.Name}' is required by unknown step '{requiredByStep}'");
var depTasks = step.DependsOnSteps.Select(depName => stepCompletions[depName].Task);
await Task.WhenAll(depTasks).ConfigureAwait(false);
}
catch (Exception ex)
{
// Find all dependencies that failed
var failedDeps = step.DependsOnSteps
.Where(depName => stepCompletions[depName].Task.IsFaulted)
.ToList();

var message = failedDeps.Count > 0
? $"Step '{step.Name}' cannot run because {(failedDeps.Count == 1 ? "dependency" : "dependencies")} {string.Join(", ", failedDeps.Select(d => $"'{d}'"))} failed"
: $"Step '{step.Name}' cannot run because a dependency failed";

// Wrap the dependency failure with context about this step
var wrappedException = new InvalidOperationException(message, ex);
stepTcs.TrySetException(wrappedException);
return;
}
}

try
{
await ExecuteStepAsync(step, context).ConfigureAwait(false);

stepTcs.TrySetResult();
}
catch (Exception ex)
{
// Execution failure - mark as failed and re-throw so it's counted
stepTcs.TrySetException(ex);
throw;
}
}

// Start all steps (they'll wait on their dependencies internally)
var allStepTasks = new Task[steps.Count];
for (var i = 0; i < steps.Count; i++)
{
var step = steps[i];
allStepTasks[i] = Task.Run(() => ExecuteStepWithDependencies(step));
}

if (stepsByName.TryGetValue(requiredByStep, out var requiredByStepObj) &&
!requiredByStepObj.DependsOnSteps.Contains(step.Name))
// Wait for all steps to complete (or fail)
try
{
await Task.WhenAll(allStepTasks).ConfigureAwait(false);
}
catch
{
// Collect all failed steps and their names
var failures = allStepTasks
.Where(t => t.IsFaulted)
.Select(t => t.Exception!)
.SelectMany(ae => ae.InnerExceptions)
.ToList();

if (failures.Count > 1)
{
// Match failures to steps to get their names
var failedStepNames = new List<string>();
for (var i = 0; i < allStepTasks.Length; i++)
{
requiredByStepObj.DependsOnSteps.Add(step.Name);
if (allStepTasks[i].IsFaulted)
{
failedStepNames.Add(steps[i].Name);
}
}

var message = failedStepNames.Count > 0
? $"Multiple pipeline steps failed: {string.Join(", ", failedStepNames.Distinct())}"
: "Multiple pipeline steps failed.";

throw new AggregateException(message, failures);
}

// Single failure - just rethrow
throw;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same theme: Wrap exception with an exception with message that includes name?

}
}

/// <summary>
/// Represents the visitation state of a step during cycle detection.
/// </summary>
private enum VisitState
{
/// <summary>
/// The step has not been visited yet.
/// </summary>
Unvisited,

/// <summary>
/// The step is currently being visited (on the current DFS path).
/// </summary>
Visiting,

/// <summary>
/// The step has been fully visited (all descendants explored).
/// </summary>
Visited
}

// Now that the `DependsOn` lists are fully populated, we can build the graph
// and the inDegree map based only on the DependOnSteps list.
/// <summary>
/// Validates that the pipeline steps form a directed acyclic graph (DAG) with no circular dependencies.
/// </summary>
/// <remarks>
/// Uses depth-first search (DFS) to detect cycles. A cycle exists if we encounter a node that is
/// currently being visited (in the Visiting state), meaning we've found a back edge in the graph.
///
/// Example: A → B → C is valid (no cycle)
/// Example: A → B → C → A is invalid (cycle detected)
/// Example: A → B, A → C, B → D, C → D is valid (diamond dependency, no cycle)
/// </remarks>
private static void ValidateDependencyGraph(
List<PipelineStep> steps,
Dictionary<string, PipelineStep> stepsByName)
{
// Process all RequiredBy relationships and normalize to DependsOn
foreach (var step in steps)
{
foreach (var dependency in step.DependsOnSteps)
foreach (var requiredByStep in step.RequiredBySteps)
{
if (!graph.TryGetValue(dependency, out var dependents))
if (!stepsByName.TryGetValue(requiredByStep, out var requiredByStepObj))
{
throw new InvalidOperationException(
$"Step '{step.Name}' depends on unknown step '{dependency}'");
$"Step '{step.Name}' is required by unknown step '{requiredByStep}'");
}

dependents.Add(step.Name);
inDegree[step.Name]++;
requiredByStepObj.DependsOnSteps.Add(step.Name);
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding requiredBy dependencies without checking for existing entries can introduce duplicate dependency names; guard with if (!requiredByStepObj.DependsOnSteps.Contains(step.Name)) before Add.

Suggested change
requiredByStepObj.DependsOnSteps.Add(step.Name);
if (!requiredByStepObj.DependsOnSteps.Contains(step.Name))
{
requiredByStepObj.DependsOnSteps.Add(step.Name);
}

Copilot uses AI. Check for mistakes.
}
}

// Perform a topological sort to determine the levels of execution and
// initialize a queue with all steps that have no dependencies (inDegree of 0)
// and can be executed immediately as part of the first level.
var levels = new List<List<PipelineStep>>();
var queue = new Queue<string>(
inDegree.Where(kvp => kvp.Value == 0).Select(kvp => kvp.Key)
);

// Process the queue until all steps have been organized into levels.
// We start with the steps that have no dependencies and then iterate
// through all the steps that depend on them to build out the graph
// until no more steps are available to process.
while (queue.Count > 0)
var visitStates = new Dictionary<string, VisitState>(steps.Count, StringComparer.Ordinal);
foreach (var step in steps)
{
var currentLevel = new List<PipelineStep>();
var levelSize = queue.Count;
visitStates[step.Name] = VisitState.Unvisited;
}

// DFS to detect cycles
void DetectCycles(string stepName, Stack<string> path)
{
var state = visitStates[stepName];

if (state == VisitState.Visiting) // Currently visiting - cycle detected!
{
var cycle = path.Reverse().SkipWhile(s => s != stepName).Append(stepName);
throw new InvalidOperationException(
$"Circular dependency detected in pipeline steps: {string.Join(" → ", cycle)}");
}

if (state == VisitState.Visited) // Already fully visited - no need to check again
{
return;
}

visitStates[stepName] = VisitState.Visiting;
path.Push(stepName);

for (var i = 0; i < levelSize; i++)
if (stepsByName.TryGetValue(stepName, out var step))
{
var stepName = queue.Dequeue();
var step = stepsByName[stepName];
currentLevel.Add(step);

// For each dependent step, reduce its inDegree by 1
// in each iteration since its dependencies have been
// processed. Once a dependent step has an inDegree
// of 0, it means all its dependencies have been
// processed and it can be added to the queue so we
// can process the next level of dependencies.
foreach (var dependent in graph[stepName])
foreach (var dependency in step.DependsOnSteps)
{
inDegree[dependent]--;
if (inDegree[dependent] == 0)
{
queue.Enqueue(dependent);
}
DetectCycles(dependency, path);
}
}

// Exhausting the queue means that we've resolved all
// steps that can run in parallel.
levels.Add(currentLevel);
path.Pop();
visitStates[stepName] = VisitState.Visited;
}

// If the total number of steps in all levels does not equal
// the total number of steps in the pipeline, it indicates that
// there is a circular dependency in the graph. Steps are enqueued
// for processing into levels above when all their dependencies are
// resolved. When a cycle exists, the degrees of the steps in the cycle
// will never reach zero and won't be enqueued for processing so the
// total number of processed steps will be less than the total number
// of steps in the pipeline.
if (levels.Sum(l => l.Count) != steps.Count())
// Check each step for cycles
var path = new Stack<string>();
foreach (var step in steps)
{
var processedSteps = new HashSet<string>(levels.SelectMany(l => l.Select(s => s.Name)));
var stepsInCycle = steps.Where(s => !processedSteps.Contains(s.Name)).Select(s => s.Name).ToList();

throw new InvalidOperationException(
$"Circular dependency detected in pipeline steps: {string.Join(", ", stepsInCycle)}");
if (visitStates[step.Name] == VisitState.Unvisited)
{
DetectCycles(step.Name, path);
}
}

return levels;
}

private static async Task ExecuteStepAsync(PipelineStep step, DeployingContext context)
Expand Down
Loading