Skip to content

Commit 0733392

Browse files
committed
Only fetch the definitions once 🧙
Since we "dereference" fetched definition and store them into the `status`, we can use this as a source of truth. This allows us to fetch only once the definitions (Task, Pipeline) *and* has the benefit to make the runs (PipelineRun, TaskRun) immune to change to what they refer, while the are executing. Without this change, if you run a PipelineRun that reference a Pipeline that is deleted during execution, your PipelineRun will fail at some point because it cannot fetch the definition anymore, even if it stored them in its status. This is the case for TaskRun too. Signed-off-by: Vincent Demeester <[email protected]>
1 parent b7f3dba commit 0733392

File tree

8 files changed

+260
-32
lines changed

8 files changed

+260
-32
lines changed

‎pkg/reconciler/pipelinerun/pipelinerun.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,8 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
207207
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, nil)
208208
}
209209

210+
// If the pipelinerun is cancelled, cancel tasks and update status
210211
if pr.IsCancelled() {
211-
// If the pipelinerun is cancelled, cancel tasks and update status
212212
err := cancelPipelineRun(ctx, logger, pr, c.PipelineClientSet)
213213
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
214214
}
@@ -278,7 +278,7 @@ func (c *Reconciler) resolvePipelineState(
278278
pst := resources.PipelineRunState{}
279279
// Resolve each task individually because they each could have a different reference context (remote or local).
280280
for _, task := range tasks {
281-
fn, _, err := tresources.GetTaskFunc(ctx, c.KubeClientSet, c.PipelineClientSet, task.TaskRef, pr.Namespace, pr.Spec.ServiceAccountName)
281+
fn, err := tresources.GetTaskFunc(ctx, c.KubeClientSet, c.PipelineClientSet, task.TaskRef, pr.Namespace, pr.Spec.ServiceAccountName)
282282
if err != nil {
283283
// This Run has failed, so we need to mark it as failed and stop reconciling it
284284
pr.Status.MarkFailed(ReasonCouldntGetTask, "Pipeline %s/%s can't be Run; task %s could not be fetched: %s",

‎pkg/reconciler/pipelinerun/resources/pipelineref.go‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,18 @@ func GetPipelineFunc(ctx context.Context, k8s kubernetes.Interface, tekton clien
3737
cfg := config.FromContextOrDefaults(ctx)
3838
pr := pipelineRun.Spec.PipelineRef
3939
namespace := pipelineRun.Namespace
40+
// if the spec is already in the status, do not try to fetch it again, just use it as source of truth
41+
if pipelineRun.Status.PipelineSpec != nil {
42+
return func(_ context.Context, name string) (v1beta1.PipelineObject, error) {
43+
return &v1beta1.Pipeline{
44+
ObjectMeta: metav1.ObjectMeta{
45+
Name: name,
46+
Namespace: namespace,
47+
},
48+
Spec: *pipelineRun.Status.PipelineSpec,
49+
}, nil
50+
}, nil
51+
}
4052
switch {
4153
case cfg.FeatureFlags.EnableTektonOCIBundles && pr != nil && pr.Bundle != "":
4254
// Return an inline function that implements GetTask by calling Resolver.Get with the specified task type and

‎pkg/reconciler/pipelinerun/resources/pipelineref_test.go‎

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,7 @@ func TestGetPipelineFunc(t *testing.T) {
146146
Name: "simple",
147147
},
148148
expected: tb.Pipeline("simple", tb.PipelineType, tb.PipelineNamespace("default"), tb.PipelineSpec(tb.PipelineTask("something", "something"))),
149-
},
150-
}
149+
}}
151150

152151
for _, tc := range testcases {
153152
t.Run(tc.name, func(t *testing.T) {
@@ -175,7 +174,7 @@ func TestGetPipelineFunc(t *testing.T) {
175174
t.Fatalf("failed to get pipeline fn: %s", err.Error())
176175
}
177176

178-
pipeline, err := fn(context.Background(), tc.ref.Name)
177+
pipeline, err := fn(ctx, tc.ref.Name)
179178
if err != nil {
180179
t.Fatalf("failed to call pipelinefn: %s", err.Error())
181180
}
@@ -186,3 +185,58 @@ func TestGetPipelineFunc(t *testing.T) {
186185
})
187186
}
188187
}
188+
189+
func TestGetPipelineFuncSpecAlreadyFetched(t *testing.T) {
190+
ctx := context.Background()
191+
ctx, cancel := context.WithCancel(ctx)
192+
defer cancel()
193+
tektonclient := fake.NewSimpleClientset(simplePipeline, dummyPipeline)
194+
kubeclient := fakek8s.NewSimpleClientset(&v1.ServiceAccount{
195+
ObjectMeta: metav1.ObjectMeta{
196+
Namespace: "default",
197+
Name: "default",
198+
},
199+
})
200+
201+
name := "anyname-really"
202+
pipelineSpec := v1beta1.PipelineSpec{
203+
Tasks: []v1beta1.PipelineTask{{
204+
Name: "task1",
205+
TaskRef: &v1beta1.TaskRef{Name: "task"},
206+
}},
207+
}
208+
pipelineRun := &v1beta1.PipelineRun{
209+
ObjectMeta: metav1.ObjectMeta{Namespace: "default"},
210+
Spec: v1beta1.PipelineRunSpec{
211+
PipelineRef: &v1beta1.PipelineRef{
212+
// Using simple here to show that, it won't fetch the simple pipelinespec,
213+
// which is different from the pipelineSpec above
214+
Name: "simple",
215+
},
216+
ServiceAccountName: "default",
217+
},
218+
Status: v1beta1.PipelineRunStatus{PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
219+
PipelineSpec: &pipelineSpec,
220+
}},
221+
}
222+
expectedPipeline := &v1beta1.Pipeline{
223+
ObjectMeta: metav1.ObjectMeta{
224+
Name: name,
225+
Namespace: "default",
226+
},
227+
Spec: pipelineSpec,
228+
}
229+
230+
fn, err := resources.GetPipelineFunc(ctx, kubeclient, tektonclient, pipelineRun)
231+
if err != nil {
232+
t.Fatalf("failed to get pipeline fn: %s", err.Error())
233+
}
234+
actualPipeline, err := fn(ctx, name)
235+
if err != nil {
236+
t.Fatalf("failed to call pipelinefn: %s", err.Error())
237+
}
238+
239+
if diff := cmp.Diff(actualPipeline, expectedPipeline); expectedPipeline != nil && diff != "" {
240+
t.Error(diff)
241+
}
242+
}

‎pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go‎

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -386,16 +386,32 @@ func ResolvePipelineRunTask(
386386
kind v1beta1.TaskKind
387387
)
388388

389+
taskRun, err := getTaskRun(rprt.TaskRunName)
390+
if err != nil {
391+
if !errors.IsNotFound(err) {
392+
return nil, fmt.Errorf("error retrieving TaskRun %s: %w", rprt.TaskRunName, err)
393+
}
394+
}
395+
if taskRun != nil {
396+
rprt.TaskRun = taskRun
397+
}
398+
389399
if task.TaskRef != nil {
390-
t, err = getTask(ctx, task.TaskRef.Name)
391-
if err != nil {
392-
return nil, &TaskNotFoundError{
393-
Name: task.TaskRef.Name,
394-
Msg: err.Error(),
400+
// If the TaskRun has already a store TaskSpec in its status, use it as source of truth
401+
if taskRun != nil && taskRun.Status.TaskSpec != nil {
402+
spec = *taskRun.Status.TaskSpec
403+
taskName = task.TaskRef.Name
404+
} else {
405+
t, err = getTask(ctx, task.TaskRef.Name)
406+
if err != nil {
407+
return nil, &TaskNotFoundError{
408+
Name: task.TaskRef.Name,
409+
Msg: err.Error(),
410+
}
395411
}
412+
spec = t.TaskSpec()
413+
taskName = t.TaskMetadata().Name
396414
}
397-
spec = t.TaskSpec()
398-
taskName = t.TaskMetadata().Name
399415
kind = task.TaskRef.Kind
400416
} else {
401417
spec = task.TaskSpec.TaskSpec
@@ -408,16 +424,6 @@ func ResolvePipelineRunTask(
408424

409425
rprt.ResolvedTaskResources = rtr
410426

411-
taskRun, err := getTaskRun(rprt.TaskRunName)
412-
if err != nil {
413-
if !errors.IsNotFound(err) {
414-
return nil, fmt.Errorf("error retrieving TaskRun %s: %w", rprt.TaskRunName, err)
415-
}
416-
}
417-
if taskRun != nil {
418-
rprt.TaskRun = taskRun
419-
}
420-
421427
// Get all conditions that this pipelineTask will be using, if any
422428
if len(task.Conditions) > 0 {
423429
rcc, err := resolveConditionChecks(&task, pipelineRun.Status.TaskRuns, rprt.TaskRunName, getTaskRun, getCondition, providedResources)

‎pkg/reconciler/taskrun/resources/taskref.go‎

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,40 @@ import (
3131
"k8s.io/client-go/kubernetes"
3232
)
3333

34+
// GetTaskKind returns the referenced Task kind (Task, ClusterTask, ...) if the TaskRun is using TaskRef.
35+
func GetTaskKind(taskrun *v1beta1.TaskRun) v1beta1.TaskKind {
36+
kind := v1alpha1.NamespacedTaskKind
37+
if taskrun.Spec.TaskRef != nil && taskrun.Spec.TaskRef.Kind != "" {
38+
kind = taskrun.Spec.TaskRef.Kind
39+
}
40+
return kind
41+
}
42+
43+
// GetTaskFuncFromTaskRun is a factory function that will use the given TaskRef as context to return a valid GetTask function. It
44+
// also requires a kubeclient, tektonclient, namespace, and service account in case it needs to find that task in
45+
// cluster or authorize against an external repositroy. It will figure out whether it needs to look in the cluster or in
46+
// a remote image to fetch the reference. It will also return the "kind" of the task being referenced.
47+
func GetTaskFuncFromTaskRun(ctx context.Context, k8s kubernetes.Interface, tekton clientset.Interface, taskrun *v1beta1.TaskRun) (GetTask, error) {
48+
// if the spec is already in the status, do not try to fetch it again, just use it as source of truth
49+
if taskrun.Status.TaskSpec != nil {
50+
return func(_ context.Context, name string) (v1beta1.TaskObject, error) {
51+
return &v1beta1.Task{
52+
ObjectMeta: metav1.ObjectMeta{
53+
Name: name,
54+
Namespace: taskrun.Namespace,
55+
},
56+
Spec: *taskrun.Status.TaskSpec,
57+
}, nil
58+
}, nil
59+
}
60+
return GetTaskFunc(ctx, k8s, tekton, taskrun.Spec.TaskRef, taskrun.Namespace, taskrun.Spec.ServiceAccountName)
61+
}
62+
3463
// GetTaskFunc is a factory function that will use the given TaskRef as context to return a valid GetTask function. It
3564
// also requires a kubeclient, tektonclient, namespace, and service account in case it needs to find that task in
3665
// cluster or authorize against an external repositroy. It will figure out whether it needs to look in the cluster or in
3766
// a remote image to fetch the reference. It will also return the "kind" of the task being referenced.
38-
func GetTaskFunc(ctx context.Context, k8s kubernetes.Interface, tekton clientset.Interface, tr *v1beta1.TaskRef, namespace, saName string) (GetTask, v1beta1.TaskKind, error) {
67+
func GetTaskFunc(ctx context.Context, k8s kubernetes.Interface, tekton clientset.Interface, tr *v1beta1.TaskRef, namespace, saName string) (GetTask, error) {
3968
cfg := config.FromContextOrDefaults(ctx)
4069
kind := v1alpha1.NamespacedTaskKind
4170
if tr != nil && tr.Kind != "" {
@@ -84,15 +113,15 @@ func GetTaskFunc(ctx context.Context, k8s kubernetes.Interface, tekton clientset
84113
}
85114

86115
return nil, fmt.Errorf("failed to convert obj %s into Task", obj.GetObjectKind().GroupVersionKind().String())
87-
}, kind, nil
116+
}, nil
88117
default:
89118
// Even if there is no task ref, we should try to return a local resolver.
90119
local := &LocalTaskRefResolver{
91120
Namespace: namespace,
92121
Kind: kind,
93122
Tektonclient: tekton,
94123
}
95-
return local.GetTask, kind, nil
124+
return local.GetTask, nil
96125
}
97126
}
98127

‎pkg/reconciler/taskrun/resources/taskref_test.go‎

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,15 +226,11 @@ func TestGetTaskFunc(t *testing.T) {
226226
t.Fatalf("failed to upload test image: %s", err.Error())
227227
}
228228

229-
fn, kind, err := resources.GetTaskFunc(ctx, kubeclient, tektonclient, tc.ref, "default", "default")
229+
fn, err := resources.GetTaskFunc(ctx, kubeclient, tektonclient, tc.ref, "default", "default")
230230
if err != nil {
231231
t.Fatalf("failed to get task fn: %s", err.Error())
232232
}
233233

234-
if string(tc.expectedKind) != string(kind) {
235-
t.Errorf("expected kind %s did not match actual kind %s", tc.expectedKind, kind)
236-
}
237-
238234
task, err := fn(ctx, tc.ref.Name)
239235
if err != nil {
240236
t.Fatalf("failed to call taskfn: %s", err.Error())
@@ -246,3 +242,63 @@ func TestGetTaskFunc(t *testing.T) {
246242
})
247243
}
248244
}
245+
246+
func TestGetTaskFuncFromTaskRunSpecAlreadyFetched(t *testing.T) {
247+
ctx := context.Background()
248+
ctx, cancel := context.WithCancel(ctx)
249+
defer cancel()
250+
tektonclient := fake.NewSimpleClientset(tb.Task("simple", tb.TaskType, tb.TaskNamespace("default"), tb.TaskSpec(tb.Step("something"))))
251+
kubeclient := fakek8s.NewSimpleClientset(&v1.ServiceAccount{
252+
ObjectMeta: metav1.ObjectMeta{
253+
Namespace: "default",
254+
Name: "default",
255+
},
256+
})
257+
258+
name := "anyname-really"
259+
TaskSpec := v1beta1.TaskSpec{
260+
Steps: []v1beta1.Step{{
261+
Container: corev1.Container{
262+
Image: "myimage",
263+
},
264+
Script: `
265+
#!/usr/bin/env bash
266+
echo hello
267+
`,
268+
}},
269+
}
270+
TaskRun := &v1beta1.TaskRun{
271+
ObjectMeta: metav1.ObjectMeta{Namespace: "default"},
272+
Spec: v1beta1.TaskRunSpec{
273+
TaskRef: &v1beta1.TaskRef{
274+
// Using simple here to show that, it won't fetch the simple Taskspec,
275+
// which is different from the TaskSpec above
276+
Name: "simple",
277+
},
278+
ServiceAccountName: "default",
279+
},
280+
Status: v1beta1.TaskRunStatus{TaskRunStatusFields: v1beta1.TaskRunStatusFields{
281+
TaskSpec: &TaskSpec,
282+
}},
283+
}
284+
expectedTask := &v1beta1.Task{
285+
ObjectMeta: metav1.ObjectMeta{
286+
Name: name,
287+
Namespace: "default",
288+
},
289+
Spec: TaskSpec,
290+
}
291+
292+
fn, err := resources.GetTaskFuncFromTaskRun(ctx, kubeclient, tektonclient, TaskRun)
293+
if err != nil {
294+
t.Fatalf("failed to get Task fn: %s", err.Error())
295+
}
296+
actualTask, err := fn(ctx, name)
297+
if err != nil {
298+
t.Fatalf("failed to call Taskfn: %s", err.Error())
299+
}
300+
301+
if diff := cmp.Diff(actualTask, expectedTask); expectedTask != nil && diff != "" {
302+
t.Error(diff)
303+
}
304+
}

‎pkg/reconciler/taskrun/taskrun.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func (c *Reconciler) prepare(ctx context.Context, tr *v1beta1.TaskRun) (*v1beta1
271271
// and may not have had all of the assumed default specified.
272272
tr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx))
273273

274-
getTaskfunc, kind, err := resources.GetTaskFunc(ctx, c.KubeClientSet, c.PipelineClientSet, tr.Spec.TaskRef, tr.Namespace, tr.Spec.ServiceAccountName)
274+
getTaskfunc, err := resources.GetTaskFuncFromTaskRun(ctx, c.KubeClientSet, c.PipelineClientSet, tr)
275275
if err != nil {
276276
logger.Errorf("Failed to fetch task reference %s: %v", tr.Spec.TaskRef.Name, err)
277277
tr.Status.SetCondition(&apis.Condition{
@@ -324,7 +324,7 @@ func (c *Reconciler) prepare(ctx context.Context, tr *v1beta1.TaskRun) (*v1beta1
324324
inputs = tr.Spec.Resources.Inputs
325325
outputs = tr.Spec.Resources.Outputs
326326
}
327-
rtr, err := resources.ResolveTaskResources(taskSpec, taskMeta.Name, kind, inputs, outputs, c.resourceLister.PipelineResources(tr.Namespace).Get)
327+
rtr, err := resources.ResolveTaskResources(taskSpec, taskMeta.Name, resources.GetTaskKind(tr), inputs, outputs, c.resourceLister.PipelineResources(tr.Namespace).Get)
328328
if err != nil {
329329
if k8serrors.IsNotFound(err) && tknreconciler.IsYoungResource(tr) {
330330
// For newly created resources, don't fail immediately.

0 commit comments

Comments
 (0)