Skip to content

Commit acd7a91

Browse files
sguiheuxyesnault
authored andcommitted
feat(api,ui): run workflow become async (#3999)
1 parent 059b369 commit acd7a91

25 files changed

+836
-398
lines changed

cli/cdsctl/monitoring.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -284,12 +284,10 @@ const (
284284
)
285285

286286
func (ui *Termui) staticRender() {
287-
checking, checkingColor := statusShort(sdk.StatusChecking.String())
288287
waiting, waitingColor := statusShort(sdk.StatusWaiting.String())
289288
building, buildingColor := statusShort(sdk.StatusBuilding.String())
290289
disabled, disabledColor := statusShort(sdk.StatusDisabled.String())
291-
ui.header.Text = fmt.Sprintf("[CDS | (h)elp | (q)uit | Legend: ](fg-cyan)[Checking:%s](%s) [Waiting:%s](%s) [Building:%s](%s) [Disabled:%s](%s)",
292-
checking, checkingColor,
290+
ui.header.Text = fmt.Sprintf("[CDS | (h)elp | (q)uit | Legend: ](fg-cyan) [Waiting:%s](%s) [Building:%s](%s) [Disabled:%s](%s)",
293291
waiting, waitingColor,
294292
building, buildingColor,
295293
disabled, disabledColor)

engine/api/workflow/dao_run.go

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strings"
1010
"time"
1111

12+
"github.com/fsamin/go-dump"
1213
"github.com/go-gorp/gorp"
1314
"go.opencensus.io/stats"
1415

@@ -58,7 +59,6 @@ func UpdateWorkflowRun(ctx context.Context, db gorp.SqlExecutor, wr *sdk.Workflo
5859
defer end()
5960

6061
wr.LastModified = time.Now()
61-
6262
for _, info := range wr.Infos {
6363
if info.IsError && info.SubNumber == wr.LastSubNumber {
6464
wr.Status = string(sdk.StatusFail)
@@ -165,15 +165,6 @@ func (r *Run) PostGet(db gorp.SqlExecutor) error {
165165
for i := range w.Joins {
166166
w.Joins[i].Ref = fmt.Sprintf("%d", w.Joins[i].ID)
167167
}
168-
// This is usefull for oldserialized workflows...
169-
//TODO: delete this after a while
170-
if len(w.Pipelines) == 0 {
171-
w.Pipelines = map[int64]sdk.Pipeline{}
172-
w.Visit(func(n *sdk.WorkflowNode) {
173-
w.Pipelines[n.PipelineID] = n.DeprecatedPipeline
174-
n.PipelineName = n.DeprecatedPipeline.Name
175-
})
176-
}
177168
r.Workflow = w
178169

179170
i := []sdk.WorkflowRunInfo{}
@@ -233,7 +224,6 @@ func updateTags(db gorp.SqlExecutor, r *Run) error {
233224
if _, err := db.Exec("delete from workflow_run_tag where workflow_run_id = $1", r.ID); err != nil {
234225
return sdk.WrapError(err, "Unable to store tags")
235226
}
236-
237227
return InsertWorkflowRunTags(db, r.ID, r.Tags)
238228
}
239229

@@ -579,10 +569,74 @@ func InsertRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
579569
return nil
580570
}
581571

572+
// CreateRun creates a new workflow run and insert it
573+
func CreateRun(db *gorp.DbMap, wf *sdk.Workflow, opts *sdk.WorkflowRunPostHandlerOption, u *sdk.User) (*sdk.WorkflowRun, error) {
574+
number, err := NextRunNumber(db, wf.ID)
575+
if err != nil {
576+
return nil, sdk.WrapError(err, "unable to get next run number")
577+
}
578+
579+
wr := &sdk.WorkflowRun{
580+
Number: number,
581+
WorkflowID: wf.ID,
582+
Start: time.Now(),
583+
LastModified: time.Now(),
584+
ProjectID: wf.ProjectID,
585+
Status: sdk.StatusPending.String(),
586+
LastExecution: time.Now(),
587+
Tags: make([]sdk.WorkflowRunTag, 0),
588+
}
589+
590+
if opts != nil && opts.Hook != nil {
591+
if trigg, ok := opts.Hook.Payload["cds.triggered_by.username"]; ok {
592+
wr.Tag(tagTriggeredBy, trigg)
593+
} else {
594+
wr.Tag(tagTriggeredBy, "cds.hook")
595+
}
596+
} else {
597+
wr.Tag(tagTriggeredBy, u.Username)
598+
}
599+
600+
tags := wf.Metadata["default_tags"]
601+
var payload map[string]string
602+
if opts != nil && opts.Hook != nil {
603+
payload = opts.Hook.Payload
604+
}
605+
if opts != nil && opts.Manual != nil {
606+
e := dump.NewDefaultEncoder()
607+
e.Formatters = []dump.KeyFormatterFunc{dump.WithDefaultLowerCaseFormatter()}
608+
e.ExtraFields.DetailedMap = false
609+
e.ExtraFields.DetailedStruct = false
610+
e.ExtraFields.Len = false
611+
e.ExtraFields.Type = false
612+
m1, errm1 := e.ToStringMap(opts.Manual)
613+
if errm1 != nil {
614+
return nil, sdk.WrapError(errm1, "unable to compute manual payload")
615+
}
616+
payload = m1
617+
}
618+
if tags != "" {
619+
tagsSplited := strings.Split(tags, ",")
620+
for _, t := range tagsSplited {
621+
if pTag, hash := payload[t]; hash {
622+
wr.Tags = append(wr.Tags, sdk.WorkflowRunTag{
623+
Tag: t,
624+
Value: pTag,
625+
})
626+
}
627+
}
628+
}
629+
630+
if err := insertWorkflowRun(db, wr); err != nil {
631+
return nil, sdk.WrapError(err, "unable to create workflow run")
632+
}
633+
return wr, nil
634+
}
635+
582636
// UpdateRunNum Update run number for the given workflow
583637
func UpdateRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
584638
if num == 1 {
585-
if _, err := nextRunNumber(db, w); err != nil {
639+
if _, err := NextRunNumber(db, w.ID); err != nil {
586640
return sdk.WrapError(err, "Cannot create run number")
587641
}
588642
return nil
@@ -597,8 +651,8 @@ func UpdateRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
597651
return nil
598652
}
599653

600-
func nextRunNumber(db gorp.SqlExecutor, w *sdk.Workflow) (int64, error) {
601-
i, err := db.SelectInt("select workflow_sequences_nextval($1)", w.ID)
654+
func NextRunNumber(db gorp.SqlExecutor, workflowID int64) (int64, error) {
655+
i, err := db.SelectInt("select workflow_sequences_nextval($1)", workflowID)
602656
if err != nil {
603657
return 0, sdk.WrapError(err, "nextRunNumber")
604658
}

engine/api/workflow/dao_run_test.go

Lines changed: 78 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package workflow_test
33
import (
44
"context"
55
"fmt"
6+
"github.com/stretchr/testify/assert"
67
"testing"
78

89
"github.com/ovh/cds/engine/api/bootstrap"
@@ -131,13 +132,18 @@ func TestPurgeWorkflowRun(t *testing.T) {
131132
test.NoError(t, err)
132133

133134
for i := 0; i < 5; i++ {
134-
_, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
135-
User: *u,
136-
Payload: map[string]string{
137-
"git.branch": "master",
138-
"git.author": "test",
135+
wr, errWR := workflow.CreateRun(db, w1, nil, u)
136+
assert.NoError(t, errWR)
137+
wr.Workflow = *w1
138+
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wr, &sdk.WorkflowRunPostHandlerOption{
139+
Manual: &sdk.WorkflowNodeRunManual{
140+
User: *u,
141+
Payload: map[string]string{
142+
"git.branch": "master",
143+
"git.author": "test",
144+
},
139145
},
140-
}, nil)
146+
}, u, nil)
141147
test.NoError(t, errWr)
142148
}
143149

@@ -222,15 +228,19 @@ func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
222228
test.NoError(t, err)
223229

224230
for i := 0; i < 5; i++ {
225-
wfr, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
226-
User: *u,
227-
Payload: map[string]string{
228-
"git.branch": "master",
229-
"git.author": "test",
231+
wfr, errWR := workflow.CreateRun(db, w1, nil, u)
232+
assert.NoError(t, errWR)
233+
wfr.Workflow = *w1
234+
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wfr, &sdk.WorkflowRunPostHandlerOption{
235+
Manual: &sdk.WorkflowNodeRunManual{
236+
User: *u,
237+
Payload: map[string]string{
238+
"git.branch": "master",
239+
"git.author": "test",
240+
},
230241
},
231-
}, nil)
242+
}, u, nil)
232243
test.NoError(t, errWr)
233-
234244
wfr.Status = sdk.StatusBuilding.String()
235245
test.NoError(t, workflow.UpdateWorkflowRunStatus(db, wfr))
236246
}
@@ -315,23 +325,33 @@ func TestPurgeWorkflowRunWithOneSuccessWorkflowRun(t *testing.T) {
315325
})
316326
test.NoError(t, err)
317327

318-
_, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
319-
User: *u,
320-
Payload: map[string]string{
321-
"git.branch": "master",
322-
"git.author": "test",
323-
},
324-
}, nil)
325-
test.NoError(t, errWr)
326-
327-
for i := 0; i < 5; i++ {
328-
wfr, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
328+
wr, errWR := workflow.CreateRun(db, w1, nil, u)
329+
assert.NoError(t, errWR)
330+
wr.Workflow = *w1
331+
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wr, &sdk.WorkflowRunPostHandlerOption{
332+
Manual: &sdk.WorkflowNodeRunManual{
329333
User: *u,
330334
Payload: map[string]string{
331335
"git.branch": "master",
332336
"git.author": "test",
333337
},
334-
}, nil)
338+
},
339+
}, u, nil)
340+
test.NoError(t, errWr)
341+
342+
for i := 0; i < 5; i++ {
343+
wfr, errWR := workflow.CreateRun(db, w1, nil, u)
344+
assert.NoError(t, errWR)
345+
wfr.Workflow = *w1
346+
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wfr, &sdk.WorkflowRunPostHandlerOption{
347+
Manual: &sdk.WorkflowNodeRunManual{
348+
User: *u,
349+
Payload: map[string]string{
350+
"git.branch": "master",
351+
"git.author": "test",
352+
},
353+
},
354+
}, u, nil)
335355
test.NoError(t, errWr)
336356

337357
wfr.Status = sdk.StatusFail.String()
@@ -423,13 +443,18 @@ func TestPurgeWorkflowRunWithNoSuccessWorkflowRun(t *testing.T) {
423443
test.NoError(t, err)
424444

425445
for i := 0; i < 5; i++ {
426-
wfr, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
427-
User: *u,
428-
Payload: map[string]string{
429-
"git.branch": "master",
430-
"git.author": "test",
446+
wfr, errWR := workflow.CreateRun(db, w1, nil, u)
447+
assert.NoError(t, errWR)
448+
wfr.Workflow = *w1
449+
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wfr, &sdk.WorkflowRunPostHandlerOption{
450+
Manual: &sdk.WorkflowNodeRunManual{
451+
User: *u,
452+
Payload: map[string]string{
453+
"git.branch": "master",
454+
"git.author": "test",
455+
},
431456
},
432-
}, nil)
457+
}, u, nil)
433458
test.NoError(t, errWr)
434459

435460
wfr.Status = sdk.StatusFail.String()
@@ -515,13 +540,18 @@ func TestPurgeWorkflowRunWithoutTags(t *testing.T) {
515540

516541
branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"}
517542
for i := 0; i < 10; i++ {
518-
_, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
519-
User: *u,
520-
Payload: map[string]string{
521-
"git.branch": branches[i],
522-
"git.author": "test",
543+
wr, errWR := workflow.CreateRun(db, w1, nil, u)
544+
assert.NoError(t, errWR)
545+
wr.Workflow = *w1
546+
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wr, &sdk.WorkflowRunPostHandlerOption{
547+
Manual: &sdk.WorkflowNodeRunManual{
548+
User: *u,
549+
Payload: map[string]string{
550+
"git.branch": branches[i],
551+
"git.author": "test",
552+
},
523553
},
524-
}, nil)
554+
}, u, nil)
525555
test.NoError(t, errWr)
526556
}
527557

@@ -604,13 +634,18 @@ func TestPurgeWorkflowRunWithoutTagsBiggerHistoryLength(t *testing.T) {
604634

605635
branches := []string{"master", "master", "master", "develop", "develop", "testBr", "testBr", "testBr", "testBr", "test4"}
606636
for i := 0; i < 10; i++ {
607-
_, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{
608-
User: *u,
609-
Payload: map[string]string{
610-
"git.branch": branches[i],
611-
"git.author": "test",
637+
wr, errWR := workflow.CreateRun(db, w1, nil, u)
638+
assert.NoError(t, errWR)
639+
wr.Workflow = *w1
640+
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wr, &sdk.WorkflowRunPostHandlerOption{
641+
Manual: &sdk.WorkflowNodeRunManual{
642+
User: *u,
643+
Payload: map[string]string{
644+
"git.branch": branches[i],
645+
"git.author": "test",
646+
},
612647
},
613-
}, nil)
648+
}, u, nil)
614649
test.NoError(t, errWr)
615650
}
616651

engine/api/workflow/dao_staticfiles_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,12 @@ func TestInsertStaticFiles(t *testing.T) {
8080
})
8181
test.NoError(t, err)
8282

83-
wfr, _, errWr := workflow.ManualRun(context.TODO(), db, cache, proj, w1, &sdk.WorkflowNodeRunManual{User: *u}, nil)
83+
wfr, errWR := workflow.CreateRun(db, w1, nil, u)
84+
assert.NoError(t, errWR)
85+
wfr.Workflow = *w1
86+
_, errWr := workflow.StartWorkflowRun(context.TODO(), db, cache, proj, wfr, &sdk.WorkflowRunPostHandlerOption{
87+
Manual: &sdk.WorkflowNodeRunManual{User: *u},
88+
}, u, nil)
8489
test.NoError(t, errWr)
8590

8691
var stFile sdk.StaticFiles

engine/api/workflow/process_data.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,16 @@ func processWorkflowDataRun(ctx context.Context, db gorp.SqlExecutor, store cach
4545
//////
4646

4747
//// Process Report
48+
oldStatus := wr.Status
4849
report := new(ProcessorReport)
4950
defer func(oldStatus string, wr *sdk.WorkflowRun) {
5051
if oldStatus != wr.Status {
5152
report.Add(*wr)
5253
}
53-
}(wr.Status, wr)
54+
}(oldStatus, wr)
5455
////
5556

56-
wr.Status = string(sdk.StatusBuilding)
57+
wr.Status = sdk.StatusBuilding.String()
5758
maxsn := MaxSubNumber(wr.WorkflowNodeRuns)
5859
wr.LastSubNumber = maxsn
5960

0 commit comments

Comments
 (0)