Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,25 @@ For instructions on using variable substitutions see the relevant section of [th
| `TaskRun` | `spec.workspaces[].csi.driver` |
| `TaskRun` | `spec.workspaces[].csi.nodePublishSecretRef.name` |
| `Pipeline` | `spec.tasks[].params[].value` |
| `Pipeline` | `spec.tasks[].conditions[].params[].value` |
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The replacement of spec.tasks[].conditions[].params[].value is a capability of v1alpha1, which no longer has that field.

The current complete definition of the pipelinetask structure is as follows:

// PipelineTask defines a task in a Pipeline, passing inputs from both
// Params and from the output of previous tasks.
type PipelineTask struct {
// Name is the name of this task within the context of a Pipeline. Name is
// used as a coordinate with the `from` and `runAfter` fields to establish
// the execution order of tasks relative to one another.
Name string `json:"name,omitempty"`
// DisplayName is the display name of this task within the context of a Pipeline.
// This display name may be used to populate a UI.
// +optional
DisplayName string `json:"displayName,omitempty"`
// Description is the description of this task within the context of a Pipeline.
// This description may be used to populate a UI.
// +optional
Description string `json:"description,omitempty"`
// TaskRef is a reference to a task definition.
// +optional
TaskRef *TaskRef `json:"taskRef,omitempty"`
// TaskSpec is a specification of a task
// Specifying TaskSpec can be disabled by setting
// `disable-inline-spec` feature flag..
// +optional
TaskSpec *EmbeddedTask `json:"taskSpec,omitempty"`
// When is a list of when expressions that need to be true for the task to run
// +optional
When WhenExpressions `json:"when,omitempty"`
// Retries represents how many times this task should be retried in case of task failure: ConditionSucceeded set to False
// +optional
Retries int `json:"retries,omitempty"`
// RunAfter is the list of PipelineTask names that should be executed before
// this Task executes. (Used to force a specific ordering in graph execution.)
// +optional
// +listType=atomic
RunAfter []string `json:"runAfter,omitempty"`
// Parameters declares parameters passed to this task.
// +optional
// +listType=atomic
Params Params `json:"params,omitempty"`
// Matrix declares parameters used to fan out this task.
// +optional
Matrix *Matrix `json:"matrix,omitempty"`
// Workspaces maps workspaces from the pipeline spec to the workspaces
// declared in the Task.
// +optional
// +listType=atomic
Workspaces []WorkspacePipelineTaskBinding `json:"workspaces,omitempty"`
// Time after which the TaskRun times out. Defaults to 1 hour.
// Refer Go's ParseDuration documentation for expected format: https://golang.org/pkg/time/#ParseDuration
// +optional
Timeout *metav1.Duration `json:"timeout,omitempty"`
// PipelineRef is a reference to a pipeline definition
// Note: PipelineRef is in preview mode and not yet supported
// +optional
PipelineRef *PipelineRef `json:"pipelineRef,omitempty"`
// PipelineSpec is a specification of a pipeline
// Note: PipelineSpec is in preview mode and not yet supported
// Specifying PipelineSpec can be disabled by setting
// `disable-inline-spec` feature flag..
// +optional
PipelineSpec *PipelineSpec `json:"pipelineSpec,omitempty"`
// OnError defines the exiting behavior of a PipelineRun on error
// can be set to [ continue | stopAndFail ]
// +optional
OnError PipelineTaskOnErrorType `json:"onError,omitempty"`
}

The previous code for replacing variables in PipelineTask is as follows:

// ApplyReplacements replaces placeholders for declared parameters with the specified replacements.
func ApplyReplacements(p *v1.PipelineSpec, replacements map[string]string, arrayReplacements map[string][]string, objectReplacements map[string]map[string]string) *v1.PipelineSpec {
p = p.DeepCopy()
for i := range p.Tasks {
p.Tasks[i].Params = p.Tasks[i].Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
if p.Tasks[i].IsMatrixed() {
p.Tasks[i].Matrix.Params = p.Tasks[i].Matrix.Params.ReplaceVariables(replacements, arrayReplacements, nil)
for j := range p.Tasks[i].Matrix.Include {
p.Tasks[i].Matrix.Include[j].Params = p.Tasks[i].Matrix.Include[j].Params.ReplaceVariables(replacements, nil, nil)
}
} else {
p.Tasks[i].DisplayName = substitution.ApplyReplacements(p.Tasks[i].DisplayName, replacements)
}
for j := range p.Tasks[i].Workspaces {
p.Tasks[i].Workspaces[j].SubPath = substitution.ApplyReplacements(p.Tasks[i].Workspaces[j].SubPath, replacements)
}
p.Tasks[i].When = p.Tasks[i].When.ReplaceVariables(replacements, arrayReplacements)
if p.Tasks[i].TaskRef != nil {
if p.Tasks[i].TaskRef.Params != nil {
p.Tasks[i].TaskRef.Params = p.Tasks[i].TaskRef.Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
}
p.Tasks[i].TaskRef.Name = substitution.ApplyReplacements(p.Tasks[i].TaskRef.Name, replacements)
}
p.Tasks[i] = propagateParams(p.Tasks[i], replacements, arrayReplacements, objectReplacements)
}
for i := range p.Finally {
p.Finally[i].Params = p.Finally[i].Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
if p.Finally[i].IsMatrixed() {
p.Finally[i].Matrix.Params = p.Finally[i].Matrix.Params.ReplaceVariables(replacements, arrayReplacements, nil)
for j := range p.Finally[i].Matrix.Include {
p.Finally[i].Matrix.Include[j].Params = p.Finally[i].Matrix.Include[j].Params.ReplaceVariables(replacements, nil, nil)
}
} else {
p.Finally[i].DisplayName = substitution.ApplyReplacements(p.Finally[i].DisplayName, replacements)
}
for j := range p.Finally[i].Workspaces {
p.Finally[i].Workspaces[j].SubPath = substitution.ApplyReplacements(p.Finally[i].Workspaces[j].SubPath, replacements)
}
p.Finally[i].When = p.Finally[i].When.ReplaceVariables(replacements, arrayReplacements)
if p.Finally[i].TaskRef != nil {
if p.Finally[i].TaskRef.Params != nil {
p.Finally[i].TaskRef.Params = p.Finally[i].TaskRef.Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
}
p.Finally[i].TaskRef.Name = substitution.ApplyReplacements(p.Finally[i].TaskRef.Name, replacements)
}
p.Finally[i] = propagateParams(p.Finally[i], replacements, arrayReplacements, objectReplacements)
}
return p
}

| `Pipeline` | `spec.results[].value` |
| `Pipeline` | `spec.tasks[].matrix.params[].value` |
| `Pipeline` | `spec.tasks[].matrix.include[].params[].value` |
| `Pipeline` | `spec.tasks[].displayName` |
| `Pipeline` | `spec.tasks[].workspaces[].subPath` |
| `Pipeline` | `spec.tasks[].when[].input` |
| `Pipeline` | `spec.tasks[].when[].values` |
| `Pipeline` | `spec.tasks[].workspaces[].subPath` |
| `Pipeline` | `spec.tasks[].displayName` |
| `Pipeline` | `spec.tasks[].taskRef.params[].values` |
| `Pipeline` | `spec.tasks[].taskRef.name` |
| `Pipeline` | `spec.tasks[].onError` |
| `Pipeline` | `spec.finally[].params[].value` |
| `Pipeline` | `spec.finally[].matrix.params[].value` |
| `Pipeline` | `spec.finally[].matrix.include[].params[].value` |
| `Pipeline` | `spec.finally[].displayName` |
| `Pipeline` | `spec.finally[].workspaces[].subPath` |
| `Pipeline` | `spec.finally[].when[].input` |
| `Pipeline` | `spec.finally[].when[].values` |
| `Pipeline` | `spec.finally[].taskRef.params[].values` |
| `Pipeline` | `spec.finally[].taskRef.name` |
| `Pipeline` | `spec.finally[].onError` |
| `PipelineRun` | `spec.workspaces[].subPath` |
| `PipelineRun` | `spec.workspaces[].persistentVolumeClaim.claimName` |
| `PipelineRun` | `spec.workspaces[].configMap.name` |
Expand Down
24 changes: 15 additions & 9 deletions pkg/apis/pipeline/v1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,7 @@ func (pt PipelineTask) Validate(ctx context.Context) (errs *apis.FieldError) {
ClusterTaskRefKind: true,
}

if pt.OnError != "" {
errs = errs.Also(config.ValidateEnabledAPIFields(ctx, "OnError", config.BetaAPIFields))
if pt.OnError != PipelineTaskContinue && pt.OnError != PipelineTaskStopAndFail {
errs = errs.Also(apis.ErrInvalidValue(pt.OnError, "OnError", "PipelineTask OnError must be either \"continue\" or \"stopAndFail\""))
}
if pt.OnError == PipelineTaskContinue && pt.Retries > 0 {
errs = errs.Also(apis.ErrGeneric("PipelineTask OnError cannot be set to \"continue\" when Retries is greater than 0"))
}
}
errs = errs.Also(pt.ValidateOnError(ctx))

// Pipeline task having taskRef/taskSpec with APIVersion is classified as custom task
switch {
Expand All @@ -217,6 +209,20 @@ func (pt PipelineTask) Validate(ctx context.Context) (errs *apis.FieldError) {
return errs
}

// ValidateOnError validates the OnError field of a PipelineTask
func (pt PipelineTask) ValidateOnError(ctx context.Context) (errs *apis.FieldError) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's convenient to reuse this verification logic in the reconcile logic of PipelineRun.

if pt.OnError != "" && !isParamRefs(string(pt.OnError)) {
errs = errs.Also(config.ValidateEnabledAPIFields(ctx, "OnError", config.BetaAPIFields))
if pt.OnError != PipelineTaskContinue && pt.OnError != PipelineTaskStopAndFail {
errs = errs.Also(apis.ErrInvalidValue(pt.OnError, "OnError", "PipelineTask OnError must be either \"continue\" or \"stopAndFail\""))
}
if pt.OnError == PipelineTaskContinue && pt.Retries > 0 {
errs = errs.Also(apis.ErrGeneric("PipelineTask OnError cannot be set to \"continue\" when Retries is greater than 0"))
}
}
return errs
}

func (pt *PipelineTask) validateMatrix(ctx context.Context) (errs *apis.FieldError) {
if pt.IsMatrixed() {
// This is a beta feature and will fail validation if it's used in a pipeline spec
Expand Down
25 changes: 25 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,15 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel
// Update pipelinespec of pipelinerun's status field
pr.Status.PipelineSpec = pipelineSpec

// validate pipelineSpec after apply parameters
if err := validatePipelineSpecAfterApplyParameters(ctx, pipelineSpec); err != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what is called deferred validation.
Targeted validation is performed after applying parameters.
The reason for not calling pipelineSpec.Validate again is that many field validations are unnecessary.

if err := pipelineSpec.Validate(ctx); err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(v1.PipelineRunReasonFailedValidation.String(),
"Pipeline %s/%s can't be Run; it has an invalid spec: %s",
pipelineMeta.Namespace, pipelineMeta.Name, pipelineErrors.WrapUserError(err))
return controller.NewPermanentError(err)
}

// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(v1.PipelineRunReasonFailedValidation.String(),
"Pipeline %s/%s can't be Run; it has an invalid spec: %s",
pipelineMeta.Namespace, pipelineMeta.Name, pipelineErrors.WrapUserError(err))
return controller.NewPermanentError(err)
}

// pipelineState holds a list of pipeline tasks after fetching their resolved Task specs.
// pipelineState also holds a taskRun for each pipeline task after the taskRun is created
// pipelineState is instantiated and updated on every reconcile cycle
Expand Down Expand Up @@ -1679,3 +1688,19 @@ func conditionFromVerificationResult(verificationResult *trustedresources.Verifi
}
return condition, err
}

// validatePipelineSpecAfterApplyParameters validates the PipelineSpec after apply parameters
// Maybe some fields are modified during apply parameters, need to validate again. For example, tasks[].OnError.
func validatePipelineSpecAfterApplyParameters(ctx context.Context, pipelineSpec *v1.PipelineSpec) (errs *apis.FieldError) {
if pipelineSpec == nil {
errs = errs.Also(apis.ErrMissingField("PipelineSpec"))
return
}
tasks := make([]v1.PipelineTask, 0, len(pipelineSpec.Tasks)+len(pipelineSpec.Finally))
tasks = append(tasks, pipelineSpec.Tasks...)
tasks = append(tasks, pipelineSpec.Finally...)
for _, t := range tasks {
errs = errs.Also(t.ValidateOnError(ctx))
}
return errs
}
49 changes: 49 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17994,6 +17994,55 @@ func Test_runNextSchedulableTask(t *testing.T) {
}
}

func TestReconcile_InvalidOnErrorPipeline(t *testing.T) {
names.TestingSeed()

namespace := "foo"
prName := "test-pipeline-invalid-onerror"

prs := []*v1.PipelineRun{
parse.MustParseV1PipelineRun(t, `
metadata:
name: test-pipeline-invalid-onerror
namespace: foo
spec:
params:
- name: onerror
value: "invalid"
pipelineSpec:
tasks:
- name: echo
onError: $(params.onerror)
taskSpec:
steps:
- name: echo
image: ubuntu
script: |
echo "Hello, World!"
exit 1
`),
}

d := test.Data{
PipelineRuns: prs,
ConfigMaps: []*corev1.ConfigMap{newFeatureFlagsConfigMap()},
}
prt := newPipelineRunTest(t, d)
defer prt.Cancel()

wantEvents := []string{
"Normal Started",
"(?s)Warning Failed .*PipelineTask OnError must be either \"continue\" or \"stopAndFail\"",
"(?s)Warning InternalError .*OnError\nPipelineTask OnError must be either \"continue\" or \"stopAndFail\"",
}
reconciledRun, clients := prt.reconcileRun(namespace, prName, wantEvents, true)

// Check that the expected TaskRun was not created
taskRuns := getTaskRunsForPipelineRun(prt.TestAssets.Ctx, t, clients, namespace, prName)
validateTaskRunsCount(t, taskRuns, 0)
verifyTaskRunStatusesCount(t, reconciledRun.Status, 0)
}

func getSignedV1Pipeline(unsigned *pipelinev1.Pipeline, signer signature.Signer, name string) (*pipelinev1.Pipeline, error) {
signed := unsigned.DeepCopy()
signed.Name = name
Expand Down
68 changes: 27 additions & 41 deletions pkg/reconciler/pipelinerun/resources/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,55 +298,41 @@ func ApplyWorkspaces(p *v1.PipelineSpec, pr *v1.PipelineRun) *v1.PipelineSpec {
return ApplyReplacements(p, replacements, map[string][]string{}, map[string]map[string]string{})
}

// ApplyReplacements replaces placeholders for declared parameters with the specified replacements.
func ApplyReplacements(p *v1.PipelineSpec, replacements map[string]string, arrayReplacements map[string][]string, objectReplacements map[string]map[string]string) *v1.PipelineSpec {
p = p.DeepCopy()

for i := range p.Tasks {
p.Tasks[i].Params = p.Tasks[i].Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
if p.Tasks[i].IsMatrixed() {
p.Tasks[i].Matrix.Params = p.Tasks[i].Matrix.Params.ReplaceVariables(replacements, arrayReplacements, nil)
for j := range p.Tasks[i].Matrix.Include {
p.Tasks[i].Matrix.Include[j].Params = p.Tasks[i].Matrix.Include[j].Params.ReplaceVariables(replacements, nil, nil)
// replaceVariablesInPipelineTasks handles variable replacement for a slice of PipelineTasks in-place
func replaceVariablesInPipelineTasks(tasks []v1.PipelineTask, replacements map[string]string,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code contained a lot of duplication; here it has been simply refactored.

arrayReplacements map[string][]string, objectReplacements map[string]map[string]string) {
for i := range tasks {
tasks[i].Params = tasks[i].Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
if tasks[i].IsMatrixed() {
tasks[i].Matrix.Params = tasks[i].Matrix.Params.ReplaceVariables(replacements, arrayReplacements, nil)
for j := range tasks[i].Matrix.Include {
tasks[i].Matrix.Include[j].Params = tasks[i].Matrix.Include[j].Params.ReplaceVariables(replacements, nil, nil)
}
} else {
p.Tasks[i].DisplayName = substitution.ApplyReplacements(p.Tasks[i].DisplayName, replacements)
tasks[i].DisplayName = substitution.ApplyReplacements(tasks[i].DisplayName, replacements)
}
for j := range p.Tasks[i].Workspaces {
p.Tasks[i].Workspaces[j].SubPath = substitution.ApplyReplacements(p.Tasks[i].Workspaces[j].SubPath, replacements)
for j := range tasks[i].Workspaces {
tasks[i].Workspaces[j].SubPath = substitution.ApplyReplacements(tasks[i].Workspaces[j].SubPath, replacements)
}
p.Tasks[i].When = p.Tasks[i].When.ReplaceVariables(replacements, arrayReplacements)
if p.Tasks[i].TaskRef != nil {
if p.Tasks[i].TaskRef.Params != nil {
p.Tasks[i].TaskRef.Params = p.Tasks[i].TaskRef.Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
tasks[i].When = tasks[i].When.ReplaceVariables(replacements, arrayReplacements)
if tasks[i].TaskRef != nil {
if tasks[i].TaskRef.Params != nil {
tasks[i].TaskRef.Params = tasks[i].TaskRef.Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
}
p.Tasks[i].TaskRef.Name = substitution.ApplyReplacements(p.Tasks[i].TaskRef.Name, replacements)
tasks[i].TaskRef.Name = substitution.ApplyReplacements(tasks[i].TaskRef.Name, replacements)
}
p.Tasks[i] = propagateParams(p.Tasks[i], replacements, arrayReplacements, objectReplacements)
tasks[i].OnError = v1.PipelineTaskOnErrorType(substitution.ApplyReplacements(string(tasks[i].OnError), replacements))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most important line of code in this new update is the support for variable replacement logic in OnError. 😆

tasks[i] = propagateParams(tasks[i], replacements, arrayReplacements, objectReplacements)
}
}

for i := range p.Finally {
p.Finally[i].Params = p.Finally[i].Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
if p.Finally[i].IsMatrixed() {
p.Finally[i].Matrix.Params = p.Finally[i].Matrix.Params.ReplaceVariables(replacements, arrayReplacements, nil)
for j := range p.Finally[i].Matrix.Include {
p.Finally[i].Matrix.Include[j].Params = p.Finally[i].Matrix.Include[j].Params.ReplaceVariables(replacements, nil, nil)
}
} else {
p.Finally[i].DisplayName = substitution.ApplyReplacements(p.Finally[i].DisplayName, replacements)
}
for j := range p.Finally[i].Workspaces {
p.Finally[i].Workspaces[j].SubPath = substitution.ApplyReplacements(p.Finally[i].Workspaces[j].SubPath, replacements)
}
p.Finally[i].When = p.Finally[i].When.ReplaceVariables(replacements, arrayReplacements)
if p.Finally[i].TaskRef != nil {
if p.Finally[i].TaskRef.Params != nil {
p.Finally[i].TaskRef.Params = p.Finally[i].TaskRef.Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
}
p.Finally[i].TaskRef.Name = substitution.ApplyReplacements(p.Finally[i].TaskRef.Name, replacements)
}
p.Finally[i] = propagateParams(p.Finally[i], replacements, arrayReplacements, objectReplacements)
}
// ApplyReplacements replaces placeholders for declared parameters with the specified replacements.
func ApplyReplacements(p *v1.PipelineSpec, replacements map[string]string, arrayReplacements map[string][]string, objectReplacements map[string]map[string]string) *v1.PipelineSpec {
p = p.DeepCopy()

// Replace variables in Tasks and Finally tasks
replaceVariablesInPipelineTasks(p.Tasks, replacements, arrayReplacements, objectReplacements)
replaceVariablesInPipelineTasks(p.Finally, replacements, arrayReplacements, objectReplacements)

return p
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/reconciler/pipelinerun/resources/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,26 @@ func TestApplyParameters(t *testing.T) {
}},
},
},
{
name: "parameter in onError",
original: v1.PipelineSpec{
Params: []v1.ParamSpec{
{Name: "onerror", Type: v1.ParamTypeString},
},
Tasks: []v1.PipelineTask{{
OnError: v1.PipelineTaskOnErrorType("$(params.onerror)"),
}},
},
params: v1.Params{{Name: "onerror", Value: *v1.NewStructuredValues("new-onerror-value")}},
expected: v1.PipelineSpec{
Params: []v1.ParamSpec{
{Name: "onerror", Type: v1.ParamTypeString},
},
Tasks: []v1.PipelineTask{{
OnError: v1.PipelineTaskOnErrorType("new-onerror-value"),
}},
},
},
} {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
Expand Down
Loading