Skip to content

Commit ab888e7

Browse files
committed
enabling consuming task results in finally
Final tasks can be configured to consume Results of PipelineTasks from tasks section
1 parent 723fca9 commit ab888e7

File tree

9 files changed

+245
-64
lines changed

9 files changed

+245
-64
lines changed

docs/pipelines.md

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,36 @@ spec:
688688
value: "someURL"
689689
```
690690

691+
#### Consuming `Task` execution results in `finally`
692+
693+
Final tasks can be configured to consume `Results` of `PipelineTask` from `tasks` section:
694+
695+
```yaml
696+
spec:
697+
tasks:
698+
- name: count-comments-before
699+
taskRef:
700+
Name: count-comments
701+
- name: add-comment
702+
taskRef:
703+
Name: add-comment
704+
- name: count-comments-after
705+
taskRef:
706+
Name: count-comments
707+
finally:
708+
- name: check-count
709+
taskRef:
710+
Name: check-count
711+
params:
712+
- name: before-count
713+
value: $(tasks.count-comments-before.results.count)
714+
- name: after-count
715+
value: $(tasks.count-comments-after.results.count)
716+
```
717+
**Note** The scheduling of such final task does not change, it will still be executed in parallel with other
718+
final tasks after all non-final tasks are done.
719+
720+
691721
### `PipelineRun` Status with `finally`
692722

693723
With `finally`, `PipelineRun` status is calculated based on `PipelineTasks` under `tasks` section and final tasks.
@@ -765,35 +795,6 @@ no `runAfter` can be specified in final tasks.
765795
final tasks are guaranteed to be executed after all `PipelineTasks` therefore no `conditions` can be specified in
766796
final tasks.
767797

768-
#### Cannot configure `Task` execution results with `finally`
769-
770-
Final tasks can not be configured to consume `Results` of `PipelineTask` from `tasks` section i.e. the following
771-
example is not supported right now but we are working on adding support for the same (tracked in issue
772-
[#2557](https://github.com/tektoncd/pipeline/issues/2557)).
773-
774-
```yaml
775-
spec:
776-
tasks:
777-
- name: count-comments-before
778-
taskRef:
779-
Name: count-comments
780-
- name: add-comment
781-
taskRef:
782-
Name: add-comment
783-
- name: count-comments-after
784-
taskRef:
785-
Name: count-comments
786-
finally:
787-
- name: check-count
788-
taskRef:
789-
Name: check-count
790-
params:
791-
- name: before-count
792-
value: $(tasks.count-comments-before.results.count) #invalid
793-
- name: after-count
794-
value: $(tasks.count-comments-after.results.count) #invalid
795-
```
796-
797798
#### Cannot configure `Pipeline` result with `finally`
798799

799800
Final tasks can emit `Results` but results emitted from the final tasks can not be configured in the

pkg/apis/pipeline/v1beta1/pipeline_validation.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -472,32 +472,13 @@ func validateFinalTasks(finalTasks []PipelineTask) *apis.FieldError {
472472
}
473473
}
474474

475-
if err := validateTaskResultReferenceNotUsed(finalTasks); err != nil {
476-
return err
477-
}
478-
479475
if err := validateTasksInputFrom(finalTasks); err != nil {
480476
return err
481477
}
482478

483479
return nil
484480
}
485481

486-
func validateTaskResultReferenceNotUsed(tasks []PipelineTask) *apis.FieldError {
487-
for _, t := range tasks {
488-
for _, p := range t.Params {
489-
expressions, ok := GetVarSubstitutionExpressionsForParam(p)
490-
if ok {
491-
if LooksLikeContainsResultRefs(expressions) {
492-
return apis.ErrInvalidValue(fmt.Sprintf("no task result allowed under params,"+
493-
"final task param %s has set task result as its value", p.Name), "spec.finally.task.params")
494-
}
495-
}
496-
}
497-
}
498-
return nil
499-
}
500-
501482
func validateTasksInputFrom(tasks []PipelineTask) *apis.FieldError {
502483
for _, t := range tasks {
503484
inputResources := []PipelineTaskInputResource{}

pkg/apis/pipeline/v1beta1/pipeline_validation_test.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,15 +1524,6 @@ func TestValidateFinalTasks_Failure(t *testing.T) {
15241524
}},
15251525
},
15261526
}},
1527-
}, {
1528-
name: "invalid pipeline with final tasks having reference to task results",
1529-
finalTasks: []PipelineTask{{
1530-
Name: "final-task",
1531-
TaskRef: &TaskRef{Name: "final-task"},
1532-
Params: []Param{{
1533-
Name: "param1", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.a-task.results.output)"},
1534-
}},
1535-
}},
15361527
}, {
15371528
name: "invalid pipeline with final task specifying when expressions",
15381529
finalTasks: []PipelineTask{{

pkg/reconciler/pipeline/dag/dag.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,33 @@ func (g *Graph) addPipelineTask(t Task) (*Node, error) {
6666
return newNode, nil
6767
}
6868

69-
// Build returns a valid pipeline Graph. Returns error if the pipeline is invalid
70-
func Build(tasks Tasks) (*Graph, error) {
69+
// BuildWithoutLinks builds a Pipeline Graph with the specified tasks as nodes (without any links).
70+
// Returns error if the pipeline is invalid
71+
func BuildWithoutLinks(tasks Tasks) (*Graph, error) {
7172
d := newGraph()
7273

73-
deps := map[string][]string{}
7474
// Add all Tasks mentioned in the `PipelineSpec`
7575
for _, pt := range tasks.Items() {
7676
if _, err := d.addPipelineTask(pt); err != nil {
7777
return nil, fmt.Errorf("task %s is already present in Graph, can't add it again: %w", pt.HashKey(), err)
7878
}
79+
}
80+
return d, nil
81+
}
82+
83+
// Build returns a valid pipeline Graph with all the dependencies converted into appropriate links.
84+
// Returns error if the pipeline is invalid
85+
func Build(tasks Tasks) (*Graph, error) {
86+
d, err := BuildWithoutLinks(tasks)
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
deps := map[string][]string{}
92+
for _, pt := range tasks.Items() {
7993
deps[pt.HashKey()] = pt.Deps()
8094
}
95+
8196
// Process all from and runAfter constraints to add task dependency
8297
for pt, taskDeps := range deps {
8398
for _, previousTask := range taskDeps {

pkg/reconciler/pipelinerun/pipelinerun.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
307307
// if a task in PipelineRunState is final task or not
308308
// the finally section is optional and might not exist
309309
// dfinally holds an empty Graph in the absence of finally clause
310-
dfinally, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Finally))
310+
dfinally, err := dag.BuildWithoutLinks(v1beta1.PipelineTaskList(pipelineSpec.Finally))
311311
if err != nil {
312312
// This Run has failed, so we need to mark it as failed and stop reconciling it
313313
pr.Status.MarkFailed(ReasonInvalidGraph,
@@ -495,7 +495,7 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
495495
logger := logging.FromContext(ctx)
496496
recorder := controller.GetEventRecorder(ctx)
497497

498-
var nextRprts []*resources.ResolvedPipelineRunTask
498+
var nextRprts resources.PipelineRunState
499499

500500
// when pipeline run is stopping, do not schedule any new task and only
501501
// wait for all running tasks to complete and report their status
@@ -521,7 +521,20 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
521521
resources.ApplyTaskResults(nextRprts, resolvedResultRefs)
522522

523523
// GetFinalTasks only returns tasks when a DAG is complete
524-
nextRprts = append(nextRprts, pipelineRunState.GetFinalTasks(d, dfinally)...)
524+
finallyRprts := pipelineRunState.GetFinalTasks(d, dfinally)
525+
526+
// Before creating TaskRun for scheduled final task, check if it's consuming a task result
527+
// Resolve and apply task result wherever applicable, report failure in case resolution fails
528+
for _, rprt := range finallyRprts {
529+
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunState, resources.PipelineRunState{rprt})
530+
if err != nil {
531+
rprt.InvalidTaskResultsInFinally = true
532+
logger.Warnf("Declaring final task %q failed as it failed to resolve task params for %q with error %v", rprt.PipelineTask.Name, pr.Name, err)
533+
continue
534+
}
535+
resources.ApplyTaskResults(resources.PipelineRunState{rprt}, resolvedResultRefs)
536+
nextRprts = append(nextRprts, rprt)
537+
}
525538

526539
for _, rprt := range nextRprts {
527540
if rprt == nil || rprt.Skip(pipelineRunState, d) {

pkg/reconciler/pipelinerun/pipelinerun_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3923,6 +3923,132 @@ func TestReconcilePipeline_TaskSpecMetadata(t *testing.T) {
39233923
}
39243924
}
39253925

3926+
func TestReconcileWithTaskResultsInFinalTasks(t *testing.T) {
3927+
names.TestingSeed()
3928+
ps := []*v1beta1.Pipeline{tb.Pipeline("test-pipeline", tb.PipelineNamespace("foo"), tb.PipelineSpec(
3929+
tb.PipelineTask("dag-task-1", "dag-task"),
3930+
tb.PipelineTask("dag-task-2", "dag-task"),
3931+
tb.FinalPipelineTask("final-task-1", "final-task-1",
3932+
tb.PipelineTaskParam("finalParam", "$(tasks.dag-task-1.results.aResult)"),
3933+
),
3934+
tb.FinalPipelineTask("final-task-2", "final-task-2",
3935+
tb.PipelineTaskParam("finalParam", "$(tasks.dag-task-2.results.aResult)"),
3936+
),
3937+
))}
3938+
prs := []*v1beta1.PipelineRun{tb.PipelineRun("test-pipeline-run-final-task-results", tb.PipelineRunNamespace("foo"),
3939+
tb.PipelineRunSpec("test-pipeline",
3940+
tb.PipelineRunServiceAccountName("test-sa-0"),
3941+
),
3942+
)}
3943+
ts := []*v1beta1.Task{
3944+
tb.Task("dag-task", tb.TaskNamespace("foo")),
3945+
tb.Task("final-task-1", tb.TaskNamespace("foo"),
3946+
tb.TaskSpec(
3947+
tb.TaskParam("finalParam", v1beta1.ParamTypeString),
3948+
),
3949+
),
3950+
tb.Task("final-task-2", tb.TaskNamespace("foo"),
3951+
tb.TaskSpec(
3952+
tb.TaskParam("finalParam", v1beta1.ParamTypeString),
3953+
),
3954+
),
3955+
}
3956+
trs := []*v1beta1.TaskRun{
3957+
tb.TaskRun("test-pipeline-run-final-task-results-dag-task-1-xxyyy",
3958+
tb.TaskRunNamespace("foo"),
3959+
tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results",
3960+
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
3961+
tb.Controller, tb.BlockOwnerDeletion,
3962+
),
3963+
tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"),
3964+
tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"),
3965+
tb.TaskRunLabel("tekton.dev/pipelineTask", "dag-task-1"),
3966+
tb.TaskRunSpec(
3967+
tb.TaskRunTaskRef("hello-world"),
3968+
tb.TaskRunServiceAccountName("test-sa"),
3969+
),
3970+
tb.TaskRunStatus(
3971+
tb.StatusCondition(
3972+
apis.Condition{
3973+
Type: apis.ConditionSucceeded,
3974+
Status: corev1.ConditionTrue,
3975+
},
3976+
),
3977+
tb.TaskRunResult("aResult", "aResultValue"),
3978+
),
3979+
),
3980+
tb.TaskRun("test-pipeline-run-final-task-results-dag-task-2-xxyyy",
3981+
tb.TaskRunNamespace("foo"),
3982+
tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results",
3983+
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
3984+
tb.Controller, tb.BlockOwnerDeletion,
3985+
),
3986+
tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"),
3987+
tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"),
3988+
tb.TaskRunLabel("tekton.dev/pipelineTask", "dag-task-2"),
3989+
tb.TaskRunSpec(
3990+
tb.TaskRunTaskRef("hello-world"),
3991+
tb.TaskRunServiceAccountName("test-sa"),
3992+
),
3993+
tb.TaskRunStatus(
3994+
tb.StatusCondition(
3995+
apis.Condition{
3996+
Type: apis.ConditionSucceeded,
3997+
Status: corev1.ConditionFalse,
3998+
},
3999+
),
4000+
),
4001+
),
4002+
}
4003+
d := test.Data{
4004+
PipelineRuns: prs,
4005+
Pipelines: ps,
4006+
Tasks: ts,
4007+
TaskRuns: trs,
4008+
}
4009+
prt := NewPipelineRunTest(d, t)
4010+
defer prt.Cancel()
4011+
4012+
reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-run-final-task-results", []string{}, false)
4013+
4014+
expectedTaskRunName := "test-pipeline-run-final-task-results-final-task-1-9l9zj"
4015+
expectedTaskRun := tb.TaskRun(expectedTaskRunName,
4016+
tb.TaskRunNamespace("foo"),
4017+
tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results",
4018+
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
4019+
tb.Controller, tb.BlockOwnerDeletion,
4020+
),
4021+
tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"),
4022+
tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"),
4023+
tb.TaskRunLabel("tekton.dev/pipelineTask", "final-task-1"),
4024+
tb.TaskRunSpec(
4025+
tb.TaskRunTaskRef("final-task-1"),
4026+
tb.TaskRunServiceAccountName("test-sa-0"),
4027+
tb.TaskRunParam("finalParam", "aResultValue"),
4028+
),
4029+
)
4030+
// Check that the expected TaskRun was created
4031+
actual, err := clients.Pipeline.TektonV1beta1().TaskRuns("foo").List(metav1.ListOptions{
4032+
LabelSelector: "tekton.dev/pipelineTask=final-task-1,tekton.dev/pipelineRun=test-pipeline-run-final-task-results",
4033+
Limit: 1,
4034+
})
4035+
4036+
if err != nil {
4037+
t.Fatalf("Failure to list TaskRun's %s", err)
4038+
}
4039+
if len(actual.Items) != 1 {
4040+
t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items))
4041+
}
4042+
actualTaskRun := actual.Items[0]
4043+
if d := cmp.Diff(&actualTaskRun, expectedTaskRun, ignoreResourceVersion); d != "" {
4044+
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d))
4045+
}
4046+
4047+
if s := reconciledRun.Status.TaskRuns["test-pipeline-run-final-task-results-final-task-2-mz4c7"].Status.GetCondition(apis.ConditionSucceeded); s.Status != corev1.ConditionFalse {
4048+
t.Fatalf("Status expected to be %s but is %s", corev1.ConditionFalse, s.Status)
4049+
}
4050+
}
4051+
39264052
// NewPipelineRunTest returns PipelineRunTest with a new PipelineRun controller created with specified state through data
39274053
// This PipelineRunTest can be reused for multiple PipelineRuns by calling reconcileRun for each pipelineRun
39284054
func NewPipelineRunTest(data test.Data, t *testing.T) *PipelineRunTest {

pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ type ResolvedPipelineRunTask struct {
6969
ResolvedTaskResources *resources.ResolvedTaskResources
7070
// ConditionChecks ~~TaskRuns but for evaling conditions
7171
ResolvedConditionChecks TaskConditionCheckState // Could also be a TaskRun or maybe just a Pod?
72+
// this flag is set for a task when task result resolution fails for that task
73+
// and mainly used to create a task run object with failure for that task
74+
InvalidTaskResultsInFinally bool
7275
}
7376

7477
func (t ResolvedPipelineRunTask) IsDone() bool {

pkg/reconciler/pipelinerun/resources/pipelinerunstate.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ import (
2828
"knative.dev/pkg/apis"
2929
)
3030

31+
const (
32+
// ReasonInvalidTaskResultsInFinallyTask indicates that the final task was not able to resolve
33+
// task result of a DAG task
34+
ReasonInvalidTaskResultsInFinallyTask = "InvalidTaskResultsInFinallyTask"
35+
)
36+
3137
// PipelineRunState is a slice of ResolvedPipelineRunTasks the represents the current execution
3238
// state of the PipelineRun.
3339
type PipelineRunState []*ResolvedPipelineRunTask
@@ -264,7 +270,7 @@ func (state PipelineRunState) GetSkippedTasks(pr *v1beta1.PipelineRun, d *dag.Gr
264270
func (state PipelineRunState) GetTaskRunsStatus(pr *v1beta1.PipelineRun) map[string]*v1beta1.PipelineRunTaskRunStatus {
265271
status := make(map[string]*v1beta1.PipelineRunTaskRunStatus)
266272
for _, rprt := range state {
267-
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil {
273+
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil && !rprt.InvalidTaskResultsInFinally {
268274
continue
269275
}
270276

@@ -305,6 +311,18 @@ func (state PipelineRunState) GetTaskRunsStatus(pr *v1beta1.PipelineRun) map[str
305311
})
306312
}
307313
}
314+
// Maintain a TaskRun Object in pr.Status for a finally task which could not resolve task results
315+
if rprt.InvalidTaskResultsInFinally {
316+
if prtrs.Status == nil {
317+
prtrs.Status = &v1beta1.TaskRunStatus{}
318+
}
319+
prtrs.Status.SetCondition(&apis.Condition{
320+
Type: apis.ConditionSucceeded,
321+
Status: corev1.ConditionFalse,
322+
Reason: ReasonInvalidTaskResultsInFinallyTask,
323+
Message: fmt.Sprintf("Finally Task %s in PipelineRun %s was having invalid task results", rprt.PipelineTask.Name, pr.Name),
324+
})
325+
}
308326
status[rprt.TaskRunName] = prtrs
309327
}
310328
return status

0 commit comments

Comments
 (0)