Skip to content

Commit ec92876

Browse files
fsaminbnjjj
authored andcommitted
feat(sdk, hatchery, worker): get queue from server side events (#3376)
1 parent cd8441b commit ec92876

File tree

16 files changed

+287
-145
lines changed

16 files changed

+287
-145
lines changed

engine/api/api_routes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ func (api *API) InitRouter() {
322322
r.Handle("/queue/workflows/{id}/take", r.POST(api.postTakeWorkflowJobHandler, NeedWorker(), EnableTracing()))
323323
r.Handle("/queue/workflows/{id}/book", r.POST(api.postBookWorkflowJobHandler, NeedHatchery(), EnableTracing()), r.DELETE(api.deleteBookWorkflowJobHandler, NeedHatchery(), EnableTracing()))
324324
r.Handle("/queue/workflows/{id}/attempt", r.POST(api.postIncWorkflowJobAttemptHandler, NeedHatchery(), EnableTracing()))
325-
r.Handle("/queue/workflows/{id}/infos", r.GET(api.getWorkflowJobHandler, NeedWorker(), EnableTracing()))
325+
r.Handle("/queue/workflows/{id}/infos", r.GET(api.getWorkflowJobHandler, NeedWorker(), NeedHatchery(), EnableTracing()))
326326
r.Handle("/queue/workflows/{permID}/vulnerability", r.POSTEXECUTE(api.postVulnerabilityReportHandler, NeedWorker(), EnableTracing()))
327327
r.Handle("/queue/workflows/{id}/spawn/infos", r.POST(r.Asynchronous(api.postSpawnInfosWorkflowJobHandler, 1), NeedHatchery(), EnableTracing()))
328328
r.Handle("/queue/workflows/{permID}/result", r.POSTEXECUTE(api.postWorkflowJobResultHandler, NeedWorker(), EnableTracing()))

engine/api/auth/auth.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func CheckServiceAuth(ctx context.Context, db *gorp.DbMap, store cache.Store, he
169169

170170
serviceHash := string(id)
171171
if serviceHash == "" {
172-
return ctx, fmt.Errorf("bad service id")
172+
return ctx, fmt.Errorf("missing service Hash")
173173
}
174174

175175
srv, err := GetService(db, store, serviceHash)

engine/api/event/publish_workflow_run.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,17 @@ func PublishWorkflowNodeRun(db gorp.SqlExecutor, nr sdk.WorkflowNodeRun, w sdk.W
9999
}
100100
publishRunWorkflow(e, w.ProjectKey, w.Name, appName, pipName, envName, nr.Number, nr.SubNumber, nr.Status, nil)
101101
}
102+
103+
// PublishWorkflowNodeJobRun publish a WorkflowNodeJobRun
104+
func PublishWorkflowNodeJobRun(db gorp.SqlExecutor, pkey, wname string, jr sdk.WorkflowNodeJobRun) {
105+
e := sdk.EventRunWorkflowJob{
106+
ID: jr.ID,
107+
Status: jr.Status,
108+
Start: jr.Start.Unix(),
109+
}
110+
111+
if sdk.StatusIsTerminated(jr.Status) {
112+
e.Done = jr.Done.Unix()
113+
}
114+
publishRunWorkflow(e, pkey, wname, "", "", "", 0, 0, jr.Status, nil)
115+
}

engine/api/events.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
"github.com/go-gorp/gorp"
1414

1515
"github.com/ovh/cds/engine/api/cache"
16+
"github.com/ovh/cds/engine/api/group"
1617
"github.com/ovh/cds/engine/api/permission"
17-
"github.com/ovh/cds/engine/api/sessionstore"
1818
"github.com/ovh/cds/engine/service"
1919
"github.com/ovh/cds/sdk"
2020
"github.com/ovh/cds/sdk/log"
@@ -137,19 +137,10 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
137137
return sdk.WrapError(fmt.Errorf("streaming unsupported"), "")
138138
}
139139

140-
uuidSK, errS := sessionstore.NewSessionKey()
141-
if errS != nil {
142-
return sdk.WrapError(errS, "eventsBroker.Serve> Cannot generate UUID")
143-
}
144-
uuid := string(uuidSK)
145-
user := getUser(ctx)
146-
if err := loadUserPermissions(b.dbFunc(), b.cache, user); err != nil {
147-
return sdk.WrapError(err, "eventsBroker.Serve Cannot load user permission")
148-
}
149-
140+
uuid := sdk.UUID()
150141
client := eventsBrokerSubscribe{
151142
UUID: uuid,
152-
User: user,
143+
User: getUser(ctx),
153144
Queue: make(chan sdk.Event, 10), // chan buffered, to avoid goroutine Start() wait on push in queue
154145
}
155146

@@ -237,38 +228,46 @@ func (b *eventsBroker) canSend(client eventsBrokerSubscribe) bool {
237228
}
238229

239230
func (s *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool {
231+
var isSharedInfra bool
232+
for _, g := range s.User.Groups {
233+
if g.ID == group.SharedInfraGroup.ID {
234+
isSharedInfra = true
235+
break
236+
}
237+
}
238+
240239
if strings.HasPrefix(event.EventType, "sdk.EventProject") {
241-
if s.User.Admin || permission.ProjectPermission(event.ProjectKey, s.User) >= permission.PermissionRead {
240+
if s.User.Admin || isSharedInfra || permission.ProjectPermission(event.ProjectKey, s.User) >= permission.PermissionRead {
242241
return true
243242
}
244243
return false
245244
}
246245
if strings.HasPrefix(event.EventType, "sdk.EventWorkflow") || strings.HasPrefix(event.EventType, "sdk.EventRunWorkflow") {
247-
if s.User.Admin || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, s.User) >= permission.PermissionRead {
246+
if s.User.Admin || isSharedInfra || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, s.User) >= permission.PermissionRead {
248247
return true
249248
}
250249
return false
251250
}
252251
if strings.HasPrefix(event.EventType, "sdk.EventApplication") {
253-
if s.User.Admin || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, s.User) >= permission.PermissionRead {
252+
if s.User.Admin || isSharedInfra || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, s.User) >= permission.PermissionRead {
254253
return true
255254
}
256255
return false
257256
}
258257
if strings.HasPrefix(event.EventType, "sdk.EventPipeline") {
259-
if s.User.Admin || permission.PipelinePermission(event.ProjectKey, event.PipelineName, s.User) >= permission.PermissionRead {
258+
if s.User.Admin || isSharedInfra || permission.PipelinePermission(event.ProjectKey, event.PipelineName, s.User) >= permission.PermissionRead {
260259
return true
261260
}
262261
return false
263262
}
264263
if strings.HasPrefix(event.EventType, "sdk.EventEnvironment") {
265-
if s.User.Admin || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, s.User) >= permission.PermissionRead {
264+
if s.User.Admin || isSharedInfra || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, s.User) >= permission.PermissionRead {
266265
return true
267266
}
268267
return false
269268
}
270269
if strings.HasPrefix(event.EventType, "sdk.EventBroadcast") {
271-
if s.User.Admin || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, s.User, permission.PermissionRead) {
270+
if s.User.Admin || isSharedInfra || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, s.User, permission.PermissionRead) {
272271
return true
273272
}
274273
return false

engine/api/grpc_handlers.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,8 @@ func (h *grpcHandlers) SendResult(c context.Context, res *sdk.Result) (*empty.Em
104104
return new(empty.Empty), sdk.WrapError(err, "SendResult> Cannot post job result")
105105
}
106106

107-
workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, p.Key)
108-
workflow.ResyncNodeRunsWithCommits(c, db, h.store, p, workflowNodeRuns)
109-
110-
go workflow.SendEvent(db, workflowRuns, workflowNodeRuns, p.Key)
107+
workflow.ResyncNodeRunsWithCommits(c, db, h.store, p, report)
108+
go workflow.SendEvent(db, p.Key, report)
111109

112110
return new(empty.Empty), nil
113111
}

engine/api/router_middleware.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"strings"
99

1010
"github.com/go-gorp/gorp"
11-
1211
"github.com/gorilla/mux"
1312

1413
"github.com/ovh/cds/engine/api/auth"
@@ -132,6 +131,10 @@ func (api *API) authMiddleware(ctx context.Context, w http.ResponseWriter, req *
132131
return ctx, sdk.WrapError(sdk.ErrUnauthorized, "Router> Unable to find connected user")
133132
}
134133

134+
if rc.Options["allowServices"] == "true" && getService(ctx) != nil {
135+
return ctx, nil
136+
}
137+
135138
if rc.Options["needHatchery"] == "true" && getHatchery(ctx) != nil {
136139
return ctx, nil
137140
}
@@ -144,10 +147,6 @@ func (api *API) authMiddleware(ctx context.Context, w http.ResponseWriter, req *
144147
return ctx, nil
145148
}
146149

147-
if rc.Options["allowServices"] == "true" && getService(ctx) != nil {
148-
return ctx, nil
149-
}
150-
151150
if getUser(ctx).Admin {
152151
return ctx, nil
153152
}

engine/api/workflow/execute_node_job_run.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ type ProcessorReport struct {
3131
errors []error
3232
}
3333

34+
// WorkflowRuns returns the list of concerned workflow runs
35+
func (r *ProcessorReport) WorkflowRuns() []sdk.WorkflowRun {
36+
return r.workflows
37+
}
38+
3439
// Add something to the report
3540
func (r *ProcessorReport) Add(i ...interface{}) {
3641
r.mutex.Lock()

engine/api/workflow/resync_workflow.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ func ResyncWorkflowRunStatus(db gorp.SqlExecutor, wr *sdk.WorkflowRun) (*Process
9898
}
9999

100100
// ResyncNodeRunsWithCommits load commits build in this node run and save it into node run
101-
func ResyncNodeRunsWithCommits(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, nodeRuns []sdk.WorkflowNodeRun) {
101+
func ResyncNodeRunsWithCommits(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, report *ProcessorReport) {
102+
nodeRuns := report.nodes
102103
for _, nodeRun := range nodeRuns {
103104
if len(nodeRun.Commits) > 0 {
104105
continue

engine/api/workflow/workflow_run_event.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,12 @@ import (
1717
"github.com/ovh/cds/sdk/log"
1818
)
1919

20-
// GetWorkflowRunEventData read channel to get elements to push
21-
// TODO: refactor this useless function
22-
func GetWorkflowRunEventData(report *ProcessorReport, projectKey string) ([]sdk.WorkflowRun, []sdk.WorkflowNodeRun) {
23-
return report.workflows, report.nodes
24-
}
25-
2620
// SendEvent Send event on workflow run
27-
func SendEvent(db gorp.SqlExecutor, wrs []sdk.WorkflowRun, wnrs []sdk.WorkflowNodeRun, key string) {
28-
for _, wr := range wrs {
21+
func SendEvent(db gorp.SqlExecutor, key string, report *ProcessorReport) {
22+
for _, wr := range report.workflows {
2923
event.PublishWorkflowRun(wr, key)
3024
}
31-
for _, wnr := range wnrs {
25+
for _, wnr := range report.nodes {
3226
wr, errWR := LoadRunByID(db, wnr.WorkflowRunID, LoadRunOptions{
3327
WithLightTests: true,
3428
})
@@ -56,6 +50,22 @@ func SendEvent(db gorp.SqlExecutor, wrs []sdk.WorkflowRun, wnrs []sdk.WorkflowNo
5650

5751
event.PublishWorkflowNodeRun(db, wnr, wr.Workflow, &previousNodeRun)
5852
}
53+
54+
for _, jobrun := range report.jobs {
55+
noderun, err := LoadNodeRunByID(db, jobrun.WorkflowNodeRunID, LoadRunOptions{})
56+
if err != nil {
57+
log.Warning("SendEvent.workflow> Cannot load workflow node run %d: %s", jobrun.WorkflowNodeRunID, err)
58+
continue
59+
}
60+
wr, errWR := LoadRunByID(db, noderun.WorkflowRunID, LoadRunOptions{
61+
WithLightTests: true,
62+
})
63+
if errWR != nil {
64+
log.Warning("SendEvent.workflow> Cannot load workflow run %d: %s", noderun.WorkflowRunID, errWR)
65+
continue
66+
}
67+
event.PublishWorkflowNodeJobRun(db, key, wr.Workflow.Name, jobrun)
68+
}
5969
}
6070

6171
// ResyncCommitStatus resync commit status for a workflow run

engine/api/workflow_queue.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,8 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
100100
return sdk.WrapError(errT, "postTakeWorkflowJobHandler> Cannot takeJob nodeJobRunID:%d", id)
101101
}
102102

103-
workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, p.Key)
104-
workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, p, workflowNodeRuns)
105-
106-
go workflow.SendEvent(api.mustDB(), workflowRuns, workflowNodeRuns, p.Key)
103+
workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, p, report)
104+
go workflow.SendEvent(api.mustDB(), p.Key, report)
107105

108106
return service.WriteJSON(w, pbji, http.StatusOK)
109107
}
@@ -425,9 +423,7 @@ func (api *API) postWorkflowJobResultHandler() service.Handler {
425423
return sdk.WrapError(err, "postWorkflowJobResultHandler> unable to post job result")
426424
}
427425

428-
observability.Record(ctx, api.Stats.WorkflowRunStarted, 1)
429-
workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, proj.Key)
430-
426+
workflowRuns := report.WorkflowRuns()
431427
if len(workflowRuns) > 0 {
432428
observability.Current(ctx,
433429
observability.Tag(observability.TagWorkflow, workflowRuns[0].Workflow.Name))
@@ -440,10 +436,10 @@ func (api *API) postWorkflowJobResultHandler() service.Handler {
440436
db := api.mustDB()
441437

442438
_, next = observability.Span(ctx, "workflow.ResyncNodeRunsWithCommits")
443-
workflow.ResyncNodeRunsWithCommits(ctx, db, api.Cache, proj, workflowNodeRuns)
439+
workflow.ResyncNodeRunsWithCommits(ctx, db, api.Cache, proj, report)
444440
next()
445441

446-
go workflow.SendEvent(db, workflowRuns, workflowNodeRuns, proj.Key)
442+
go workflow.SendEvent(db, proj.Key, report)
447443

448444
return nil
449445
}

0 commit comments

Comments
 (0)