Skip to content

Commit f87896f

Browse files
committed
enabling consuming task results in finally
Final tasks can be configured to consume Results of PipelineTasks from tasks section
1 parent 995e44f commit f87896f

File tree

9 files changed

+244
-67
lines changed

9 files changed

+244
-67
lines changed

docs/pipelines.md

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,36 @@ spec:
699699
value: "someURL"
700700
```
701701

702+
#### Consuming `Task` execution results in `finally`
703+
704+
Final tasks can be configured to consume `Results` of `PipelineTask` from `tasks` section:
705+
706+
```yaml
707+
spec:
708+
tasks:
709+
- name: count-comments-before
710+
taskRef:
711+
Name: count-comments
712+
- name: add-comment
713+
taskRef:
714+
Name: add-comment
715+
- name: count-comments-after
716+
taskRef:
717+
Name: count-comments
718+
finally:
719+
- name: check-count
720+
taskRef:
721+
Name: check-count
722+
params:
723+
- name: before-count
724+
value: $(tasks.count-comments-before.results.count)
725+
- name: after-count
726+
value: $(tasks.count-comments-after.results.count)
727+
```
728+
**Note** The scheduling of such final task does not change, it will still be executed in parallel with other
729+
final tasks after all non-final tasks are done.
730+
731+
702732
### `PipelineRun` Status with `finally`
703733

704734
With `finally`, `PipelineRun` status is calculated based on `PipelineTasks` under `tasks` section and final tasks.
@@ -776,35 +806,6 @@ no `runAfter` can be specified in final tasks.
776806
final tasks are guaranteed to be executed after all `PipelineTasks` therefore no `conditions` can be specified in
777807
final tasks.
778808

779-
#### Cannot configure `Task` execution results with `finally`
780-
781-
Final tasks can not be configured to consume `Results` of `PipelineTask` from `tasks` section i.e. the following
782-
example is not supported right now but we are working on adding support for the same (tracked in issue
783-
[#2557](https://github.com/tektoncd/pipeline/issues/2557)).
784-
785-
```yaml
786-
spec:
787-
tasks:
788-
- name: count-comments-before
789-
taskRef:
790-
Name: count-comments
791-
- name: add-comment
792-
taskRef:
793-
Name: add-comment
794-
- name: count-comments-after
795-
taskRef:
796-
Name: count-comments
797-
finally:
798-
- name: check-count
799-
taskRef:
800-
Name: check-count
801-
params:
802-
- name: before-count
803-
value: $(tasks.count-comments-before.results.count) #invalid
804-
- name: after-count
805-
value: $(tasks.count-comments-after.results.count) #invalid
806-
```
807-
808809
#### Cannot configure `Pipeline` result with `finally`
809810

810811
Final tasks can emit `Results` but results emitted from the final tasks can not be configured in the

pkg/apis/pipeline/v1beta1/pipeline_validation.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -308,32 +308,13 @@ func validateFinalTasks(finalTasks []PipelineTask) *apis.FieldError {
308308
}
309309
}
310310

311-
if err := validateTaskResultReferenceNotUsed(finalTasks).ViaField("finally"); err != nil {
312-
return err
313-
}
314-
315311
if err := validateTasksInputFrom(finalTasks).ViaField("finally"); err != nil {
316312
return err
317313
}
318314

319315
return nil
320316
}
321317

322-
func validateTaskResultReferenceNotUsed(tasks []PipelineTask) *apis.FieldError {
323-
for idx, t := range tasks {
324-
for _, p := range t.Params {
325-
expressions, ok := GetVarSubstitutionExpressionsForParam(p)
326-
if ok {
327-
if LooksLikeContainsResultRefs(expressions) {
328-
return apis.ErrInvalidValue(fmt.Sprintf("no task result allowed under params,"+
329-
"final task param %s has set task result as its value", p.Name), "params").ViaIndex(idx)
330-
}
331-
}
332-
}
333-
}
334-
return nil
335-
}
336-
337318
func validateTasksInputFrom(tasks []PipelineTask) (errs *apis.FieldError) {
338319
for idx, t := range tasks {
339320
inputResources := []PipelineTaskInputResource{}

pkg/apis/pipeline/v1beta1/pipeline_validation_test.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1838,19 +1838,6 @@ func TestValidateFinalTasks_Failure(t *testing.T) {
18381838
Message: `no from allowed under inputs, final task final-input-2 has from specified`,
18391839
Paths: []string{"finally[0].resources.inputs[0]"},
18401840
},
1841-
}, {
1842-
name: "invalid pipeline with final tasks having reference to task results",
1843-
finalTasks: []PipelineTask{{
1844-
Name: "final-task",
1845-
TaskRef: &TaskRef{Name: "final-task"},
1846-
Params: []Param{{
1847-
Name: "param1", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.a-task.results.output)"},
1848-
}},
1849-
}},
1850-
expectedError: apis.FieldError{
1851-
Message: `invalid value: no task result allowed under params,final task param param1 has set task result as its value`,
1852-
Paths: []string{"finally[0].params"},
1853-
},
18541841
}, {
18551842
name: "invalid pipeline with final task specifying when expressions",
18561843
finalTasks: []PipelineTask{{

pkg/reconciler/pipeline/dag/dag.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,33 @@ func (g *Graph) addPipelineTask(t Task) (*Node, error) {
6666
return newNode, nil
6767
}
6868

69-
// Build returns a valid pipeline Graph. Returns error if the pipeline is invalid
70-
func Build(tasks Tasks) (*Graph, error) {
69+
// BuildWithoutLinks builds a Pipeline Graph with the specified tasks as nodes (without any links).
70+
// Returns error if the pipeline is invalid
71+
func BuildWithoutLinks(tasks Tasks) (*Graph, error) {
7172
d := newGraph()
7273

73-
deps := map[string][]string{}
7474
// Add all Tasks mentioned in the `PipelineSpec`
7575
for _, pt := range tasks.Items() {
7676
if _, err := d.addPipelineTask(pt); err != nil {
7777
return nil, fmt.Errorf("task %s is already present in Graph, can't add it again: %w", pt.HashKey(), err)
7878
}
79+
}
80+
return d, nil
81+
}
82+
83+
// Build returns a valid pipeline Graph with all the dependencies converted into appropriate links.
84+
// Returns error if the pipeline is invalid
85+
func Build(tasks Tasks) (*Graph, error) {
86+
d, err := BuildWithoutLinks(tasks)
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
deps := map[string][]string{}
92+
for _, pt := range tasks.Items() {
7993
deps[pt.HashKey()] = pt.Deps()
8094
}
95+
8196
// Process all from and runAfter constraints to add task dependency
8297
for pt, taskDeps := range deps {
8398
for _, previousTask := range taskDeps {

pkg/reconciler/pipelinerun/pipelinerun.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
307307
// if a task in PipelineRunState is final task or not
308308
// the finally section is optional and might not exist
309309
// dfinally holds an empty Graph in the absence of finally clause
310-
dfinally, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Finally))
310+
dfinally, err := dag.BuildWithoutLinks(v1beta1.PipelineTaskList(pipelineSpec.Finally))
311311
if err != nil {
312312
// This Run has failed, so we need to mark it as failed and stop reconciling it
313313
pr.Status.MarkFailed(ReasonInvalidGraph,
@@ -521,7 +521,20 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
521521
resources.ApplyTaskResults(nextRprts, resolvedResultRefs)
522522

523523
// GetFinalTasks only returns tasks when a DAG is complete
524-
nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...)
524+
finallyRprts := pipelineRunFacts.GetFinalTasks()
525+
526+
// Before creating TaskRun for scheduled final task, check if it's consuming a task result
527+
// Resolve and apply task result wherever applicable, report failure in case resolution fails
528+
for _, rprt := range finallyRprts {
529+
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunFacts.State, resources.PipelineRunState{rprt})
530+
if err != nil {
531+
rprt.InvalidTaskResultsInFinally = true
532+
logger.Warnf("Declaring final task %q failed as it failed to resolve task params for %q with error %v", rprt.PipelineTask.Name, pr.Name, err)
533+
continue
534+
}
535+
resources.ApplyTaskResults(resources.PipelineRunState{rprt}, resolvedResultRefs)
536+
nextRprts = append(nextRprts, rprt)
537+
}
525538

526539
for _, rprt := range nextRprts {
527540
if rprt == nil || rprt.Skip(pipelineRunFacts) {

pkg/reconciler/pipelinerun/pipelinerun_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3925,6 +3925,132 @@ func TestReconcilePipeline_TaskSpecMetadata(t *testing.T) {
39253925
}
39263926
}
39273927

3928+
func TestReconcileWithTaskResultsInFinalTasks(t *testing.T) {
3929+
names.TestingSeed()
3930+
ps := []*v1beta1.Pipeline{tb.Pipeline("test-pipeline", tb.PipelineNamespace("foo"), tb.PipelineSpec(
3931+
tb.PipelineTask("dag-task-1", "dag-task"),
3932+
tb.PipelineTask("dag-task-2", "dag-task"),
3933+
tb.FinalPipelineTask("final-task-1", "final-task-1",
3934+
tb.PipelineTaskParam("finalParam", "$(tasks.dag-task-1.results.aResult)"),
3935+
),
3936+
tb.FinalPipelineTask("final-task-2", "final-task-2",
3937+
tb.PipelineTaskParam("finalParam", "$(tasks.dag-task-2.results.aResult)"),
3938+
),
3939+
))}
3940+
prs := []*v1beta1.PipelineRun{tb.PipelineRun("test-pipeline-run-final-task-results", tb.PipelineRunNamespace("foo"),
3941+
tb.PipelineRunSpec("test-pipeline",
3942+
tb.PipelineRunServiceAccountName("test-sa-0"),
3943+
),
3944+
)}
3945+
ts := []*v1beta1.Task{
3946+
tb.Task("dag-task", tb.TaskNamespace("foo")),
3947+
tb.Task("final-task-1", tb.TaskNamespace("foo"),
3948+
tb.TaskSpec(
3949+
tb.TaskParam("finalParam", v1beta1.ParamTypeString),
3950+
),
3951+
),
3952+
tb.Task("final-task-2", tb.TaskNamespace("foo"),
3953+
tb.TaskSpec(
3954+
tb.TaskParam("finalParam", v1beta1.ParamTypeString),
3955+
),
3956+
),
3957+
}
3958+
trs := []*v1beta1.TaskRun{
3959+
tb.TaskRun("test-pipeline-run-final-task-results-dag-task-1-xxyyy",
3960+
tb.TaskRunNamespace("foo"),
3961+
tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results",
3962+
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
3963+
tb.Controller, tb.BlockOwnerDeletion,
3964+
),
3965+
tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"),
3966+
tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"),
3967+
tb.TaskRunLabel("tekton.dev/pipelineTask", "dag-task-1"),
3968+
tb.TaskRunSpec(
3969+
tb.TaskRunTaskRef("hello-world"),
3970+
tb.TaskRunServiceAccountName("test-sa"),
3971+
),
3972+
tb.TaskRunStatus(
3973+
tb.StatusCondition(
3974+
apis.Condition{
3975+
Type: apis.ConditionSucceeded,
3976+
Status: corev1.ConditionTrue,
3977+
},
3978+
),
3979+
tb.TaskRunResult("aResult", "aResultValue"),
3980+
),
3981+
),
3982+
tb.TaskRun("test-pipeline-run-final-task-results-dag-task-2-xxyyy",
3983+
tb.TaskRunNamespace("foo"),
3984+
tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results",
3985+
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
3986+
tb.Controller, tb.BlockOwnerDeletion,
3987+
),
3988+
tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"),
3989+
tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"),
3990+
tb.TaskRunLabel("tekton.dev/pipelineTask", "dag-task-2"),
3991+
tb.TaskRunSpec(
3992+
tb.TaskRunTaskRef("hello-world"),
3993+
tb.TaskRunServiceAccountName("test-sa"),
3994+
),
3995+
tb.TaskRunStatus(
3996+
tb.StatusCondition(
3997+
apis.Condition{
3998+
Type: apis.ConditionSucceeded,
3999+
Status: corev1.ConditionFalse,
4000+
},
4001+
),
4002+
),
4003+
),
4004+
}
4005+
d := test.Data{
4006+
PipelineRuns: prs,
4007+
Pipelines: ps,
4008+
Tasks: ts,
4009+
TaskRuns: trs,
4010+
}
4011+
prt := NewPipelineRunTest(d, t)
4012+
defer prt.Cancel()
4013+
4014+
reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-run-final-task-results", []string{}, false)
4015+
4016+
expectedTaskRunName := "test-pipeline-run-final-task-results-final-task-1-9l9zj"
4017+
expectedTaskRun := tb.TaskRun(expectedTaskRunName,
4018+
tb.TaskRunNamespace("foo"),
4019+
tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results",
4020+
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
4021+
tb.Controller, tb.BlockOwnerDeletion,
4022+
),
4023+
tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"),
4024+
tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"),
4025+
tb.TaskRunLabel("tekton.dev/pipelineTask", "final-task-1"),
4026+
tb.TaskRunSpec(
4027+
tb.TaskRunTaskRef("final-task-1"),
4028+
tb.TaskRunServiceAccountName("test-sa-0"),
4029+
tb.TaskRunParam("finalParam", "aResultValue"),
4030+
),
4031+
)
4032+
// Check that the expected TaskRun was created
4033+
actual, err := clients.Pipeline.TektonV1beta1().TaskRuns("foo").List(metav1.ListOptions{
4034+
LabelSelector: "tekton.dev/pipelineTask=final-task-1,tekton.dev/pipelineRun=test-pipeline-run-final-task-results",
4035+
Limit: 1,
4036+
})
4037+
4038+
if err != nil {
4039+
t.Fatalf("Failure to list TaskRun's %s", err)
4040+
}
4041+
if len(actual.Items) != 1 {
4042+
t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items))
4043+
}
4044+
actualTaskRun := actual.Items[0]
4045+
if d := cmp.Diff(&actualTaskRun, expectedTaskRun, ignoreResourceVersion); d != "" {
4046+
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d))
4047+
}
4048+
4049+
if s := reconciledRun.Status.TaskRuns["test-pipeline-run-final-task-results-final-task-2-mz4c7"].Status.GetCondition(apis.ConditionSucceeded); s.Status != corev1.ConditionFalse {
4050+
t.Fatalf("Status expected to be %s but is %s", corev1.ConditionFalse, s.Status)
4051+
}
4052+
}
4053+
39284054
// NewPipelineRunTest returns PipelineRunTest with a new PipelineRun controller created with specified state through data
39294055
// This PipelineRunTest can be reused for multiple PipelineRuns by calling reconcileRun for each pipelineRun
39304056
func NewPipelineRunTest(data test.Data, t *testing.T) *PipelineRunTest {

pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ type ResolvedPipelineRunTask struct {
6868
ResolvedTaskResources *resources.ResolvedTaskResources
6969
// ConditionChecks ~~TaskRuns but for evaling conditions
7070
ResolvedConditionChecks TaskConditionCheckState // Could also be a TaskRun or maybe just a Pod?
71+
// this flag is set for a task when task result resolution fails for that task
72+
// and mainly used to create a task run object with failure for that task
73+
InvalidTaskResultsInFinally bool
7174
}
7275

7376
func (t ResolvedPipelineRunTask) IsDone() bool {

pkg/reconciler/pipelinerun/resources/pipelinerunstate.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ import (
2828
"knative.dev/pkg/apis"
2929
)
3030

31+
const (
32+
// ReasonInvalidTaskResultsInFinallyTask indicates that the final task was not able to resolve
33+
// task result of a DAG task
34+
ReasonInvalidTaskResultsInFinallyTask = "InvalidTaskResultsInFinallyTask"
35+
)
36+
3137
// PipelineRunState is a slice of ResolvedPipelineRunTasks the represents the current execution
3238
// state of the PipelineRun.
3339
type PipelineRunState []*ResolvedPipelineRunTask
@@ -283,7 +289,7 @@ func (facts *PipelineRunFacts) GetSkippedTasks() []v1beta1.SkippedTask {
283289
func (state PipelineRunState) GetTaskRunsStatus(pr *v1beta1.PipelineRun) map[string]*v1beta1.PipelineRunTaskRunStatus {
284290
status := make(map[string]*v1beta1.PipelineRunTaskRunStatus)
285291
for _, rprt := range state {
286-
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil {
292+
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil && !rprt.InvalidTaskResultsInFinally {
287293
continue
288294
}
289295

@@ -324,6 +330,18 @@ func (state PipelineRunState) GetTaskRunsStatus(pr *v1beta1.PipelineRun) map[str
324330
})
325331
}
326332
}
333+
// Maintain a TaskRun Object in pr.Status for a finally task which could not resolve task results
334+
if rprt.InvalidTaskResultsInFinally {
335+
if prtrs.Status == nil {
336+
prtrs.Status = &v1beta1.TaskRunStatus{}
337+
}
338+
prtrs.Status.SetCondition(&apis.Condition{
339+
Type: apis.ConditionSucceeded,
340+
Status: corev1.ConditionFalse,
341+
Reason: ReasonInvalidTaskResultsInFinallyTask,
342+
Message: fmt.Sprintf("Finally Task %s in PipelineRun %s was having invalid task results", rprt.PipelineTask.Name, pr.Name),
343+
})
344+
}
327345
status[rprt.TaskRunName] = prtrs
328346
}
329347
return status

0 commit comments

Comments
 (0)