Skip to content

Commit 78f5479

Browse files
pritidesaitekton-robot
authored andcommitted
refactoring pipelinerunstate
Introducing PipelineRunFacts as a combination of PipelineRunState, DAG Graph, and finally Graph. This simplifies function definitions without having to pass graphs to PipelineRunState attributes.
1 parent fa9dffe commit 78f5479

File tree

5 files changed

+189
-133
lines changed

5 files changed

+189
-133
lines changed

pkg/reconciler/pipelinerun/pipelinerun.go

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,15 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
428428
return controller.NewPermanentError(err)
429429
}
430430

431-
for _, rprt := range pipelineRunState {
431+
// Build PipelineRunFacts with a list of resolved pipeline tasks,
432+
// dag tasks graph and final tasks graph
433+
pipelineRunFacts := &resources.PipelineRunFacts{
434+
State: pipelineRunState,
435+
TasksGraph: d,
436+
FinalTasksGraph: dfinally,
437+
}
438+
439+
for _, rprt := range pipelineRunFacts.State {
432440
err := taskrun.ValidateResolvedTaskResources(rprt.PipelineTask.Params, rprt.ResolvedTaskResources)
433441
if err != nil {
434442
logger.Errorf("Failed to validate pipelinerun %q with error %v", pr.Name, err)
@@ -437,7 +445,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
437445
}
438446
}
439447

440-
if pipelineRunState.IsBeforeFirstTaskRun() {
448+
if pipelineRunFacts.State.IsBeforeFirstTaskRun() {
441449
if pr.HasVolumeClaimTemplate() {
442450
// create workspace PVC from template
443451
if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(pr.Spec.Workspaces, pr.GetOwnerReference(), pr.Namespace); err != nil {
@@ -467,11 +475,11 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
467475
return controller.NewPermanentError(err)
468476
}
469477

470-
if err := c.runNextSchedulableTask(ctx, pr, d, dfinally, pipelineRunState, as); err != nil {
478+
if err := c.runNextSchedulableTask(ctx, pr, pipelineRunFacts, as); err != nil {
471479
return err
472480
}
473481

474-
after := pipelineRunState.GetPipelineConditionStatus(pr, logger, d, dfinally)
482+
after := pipelineRunFacts.GetPipelineConditionStatus(pr, logger)
475483
switch after.Status {
476484
case corev1.ConditionTrue:
477485
pr.Status.MarkSucceeded(after.Reason, after.Message)
@@ -482,37 +490,28 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
482490
}
483491
// Read the condition the way it was set by the Mark* helpers
484492
after = pr.Status.GetCondition(apis.ConditionSucceeded)
485-
pr.Status.TaskRuns = pipelineRunState.GetTaskRunsStatus(pr)
486-
pr.Status.SkippedTasks = pipelineRunState.GetSkippedTasks(pr, d)
493+
pr.Status.TaskRuns = pipelineRunFacts.State.GetTaskRunsStatus(pr)
494+
pr.Status.SkippedTasks = pipelineRunFacts.GetSkippedTasks()
487495
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
488496
return nil
489497
}
490498

491499
// runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current
492500
// pipeline run state, and starts them
493501
// after all DAG tasks are done, it's responsible for scheduling final tasks and start executing them
494-
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, d *dag.Graph, dfinally *dag.Graph, pipelineRunState resources.PipelineRunState, as artifacts.ArtifactStorageInterface) error {
502+
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, pipelineRunFacts *resources.PipelineRunFacts, as artifacts.ArtifactStorageInterface) error {
495503

496504
logger := logging.FromContext(ctx)
497505
recorder := controller.GetEventRecorder(ctx)
498506

499-
var nextRprts []*resources.ResolvedPipelineRunTask
500-
501-
// when pipeline run is stopping, do not schedule any new task and only
502-
// wait for all running tasks to complete and report their status
503-
if !pipelineRunState.IsStopping(d) {
504-
// candidateTasks is initialized to DAG root nodes to start pipeline execution
505-
// candidateTasks is derived based on successfully finished tasks and/or skipped tasks
506-
candidateTasks, err := dag.GetSchedulable(d, pipelineRunState.SuccessfulOrSkippedDAGTasks(d)...)
507-
if err != nil {
508-
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
509-
return controller.NewPermanentError(err)
510-
}
511-
// nextRprts holds a list of pipeline tasks which should be executed next
512-
nextRprts = pipelineRunState.GetNextTasks(candidateTasks)
507+
// nextRprts holds a list of pipeline tasks which should be executed next
508+
nextRprts, err := pipelineRunFacts.DAGExecutionQueue()
509+
if err != nil {
510+
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
511+
return controller.NewPermanentError(err)
513512
}
514513

515-
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunState, nextRprts)
514+
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunFacts.State, nextRprts)
516515
if err != nil {
517516
logger.Infof("Failed to resolve all task params for %q with error %v", pr.Name, err)
518517
pr.Status.MarkFailed(ReasonFailedValidation, err.Error())
@@ -522,10 +521,10 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
522521
resources.ApplyTaskResults(nextRprts, resolvedResultRefs)
523522

524523
// GetFinalTasks only returns tasks when a DAG is complete
525-
nextRprts = append(nextRprts, pipelineRunState.GetFinalTasks(d, dfinally)...)
524+
nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...)
526525

527526
for _, rprt := range nextRprts {
528-
if rprt == nil || rprt.Skip(pipelineRunState, d) {
527+
if rprt == nil || rprt.Skip(pipelineRunFacts) {
529528
continue
530529
}
531530
if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() {

pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"github.com/tektoncd/pipeline/pkg/contexts"
3131
"github.com/tektoncd/pipeline/pkg/list"
3232
"github.com/tektoncd/pipeline/pkg/names"
33-
"github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag"
3433
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
3534
)
3635

@@ -134,17 +133,15 @@ func (t ResolvedPipelineRunTask) IsStarted() bool {
134133
return true
135134
}
136135

137-
func (t *ResolvedPipelineRunTask) checkParentsDone(state PipelineRunState, d *dag.Graph) bool {
138-
stateMap := state.ToMap()
136+
func (t *ResolvedPipelineRunTask) checkParentsDone(facts *PipelineRunFacts) bool {
137+
stateMap := facts.State.ToMap()
139138
// check if parent tasks are done executing,
140139
// if any of the parents is not yet scheduled or still running,
141140
// wait for it to complete before evaluating when expressions
142-
node := d.Nodes[t.PipelineTask.Name]
143-
if isTaskInGraph(t.PipelineTask.Name, d) {
144-
for _, p := range node.Prev {
145-
if !stateMap[p.Task.HashKey()].IsDone() {
146-
return false
147-
}
141+
node := facts.TasksGraph.Nodes[t.PipelineTask.Name]
142+
for _, p := range node.Prev {
143+
if !stateMap[p.Task.HashKey()].IsDone() {
144+
return false
148145
}
149146
}
150147
return true
@@ -156,7 +153,12 @@ func (t *ResolvedPipelineRunTask) checkParentsDone(state PipelineRunState, d *da
156153
// (3) its parent task was skipped
157154
// (4) Pipeline is in stopping state (one of the PipelineTasks failed)
158155
// Note that this means Skip returns false if a conditionCheck is in progress
159-
func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) bool {
156+
func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool {
157+
// finally tasks are never skipped. If this is a final task, return false
158+
if facts.isFinalTask(t.PipelineTask.Name) {
159+
return false
160+
}
161+
160162
// it already has TaskRun associated with it - PipelineTask not skipped
161163
if t.IsStarted() {
162164
return false
@@ -170,7 +172,7 @@ func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) boo
170172
}
171173

172174
// Check if the when expressions are false, based on the input's relationship to the values
173-
if t.checkParentsDone(state, d) {
175+
if t.checkParentsDone(facts) {
174176
if len(t.PipelineTask.WhenExpressions) > 0 {
175177
if !t.PipelineTask.WhenExpressions.HaveVariables() {
176178
if !t.PipelineTask.WhenExpressions.AllowsExecution() {
@@ -181,19 +183,17 @@ func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) boo
181183
}
182184

183185
// Skip the PipelineTask if pipeline is in stopping state
184-
if isTaskInGraph(t.PipelineTask.Name, d) && state.IsStopping(d) {
186+
if facts.IsStopping() {
185187
return true
186188
}
187189

188-
stateMap := state.ToMap()
190+
stateMap := facts.State.ToMap()
189191
// Recursively look at parent tasks to see if they have been skipped,
190192
// if any of the parents have been skipped, skip as well
191-
node := d.Nodes[t.PipelineTask.Name]
192-
if isTaskInGraph(t.PipelineTask.Name, d) {
193-
for _, p := range node.Prev {
194-
if stateMap[p.Task.HashKey()].Skip(state, d) {
195-
return true
196-
}
193+
node := facts.TasksGraph.Nodes[t.PipelineTask.Name]
194+
for _, p := range node.Prev {
195+
if stateMap[p.Task.HashKey()].Skip(facts) {
196+
return true
197197
}
198198
}
199199
return false

pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,7 @@ func TestIsSkipped(t *testing.T) {
835835

836836
for _, tc := range tcs {
837837
t.Run(tc.name, func(t *testing.T) {
838-
dag, err := DagFromState(tc.state)
838+
d, err := DagFromState(tc.state)
839839
if err != nil {
840840
t.Fatalf("Could not get a dag from the TC state %#v: %v", tc.state, err)
841841
}
@@ -844,7 +844,12 @@ func TestIsSkipped(t *testing.T) {
844844
if rprt == nil {
845845
t.Fatalf("Could not get task %s from the state: %v", tc.taskName, tc.state)
846846
}
847-
isSkipped := rprt.Skip(tc.state, dag)
847+
facts := PipelineRunFacts{
848+
State: tc.state,
849+
TasksGraph: d,
850+
FinalTasksGraph: &dag.Graph{},
851+
}
852+
isSkipped := rprt.Skip(&facts)
848853
if d := cmp.Diff(isSkipped, tc.expected); d != "" {
849854
t.Errorf("Didn't get expected isSkipped %s", diff.PrintWantGot(d))
850855
}

0 commit comments

Comments
 (0)