Skip to content

Commit 0476cc2

Browse files
authored
feat(api): add trace on workflow queue (#3306)
1 parent 7b89844 commit 0476cc2

File tree

5 files changed

+33
-17
lines changed

5 files changed

+33
-17
lines changed

engine/api/api_routes.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,8 @@ func (api *API) InitRouter() {
328328
r.Handle("/build/{id}/step", r.POST(api.updateStepStatusHandler))
329329

330330
//Workflow queue
331-
r.Handle("/queue/workflows", r.GET(api.getWorkflowJobQueueHandler))
332-
r.Handle("/queue/workflows/count", r.GET(api.countWorkflowJobQueueHandler))
331+
r.Handle("/queue/workflows", r.GET(api.getWorkflowJobQueueHandler, EnableTracing()))
332+
r.Handle("/queue/workflows/count", r.GET(api.countWorkflowJobQueueHandler, EnableTracing()))
333333
r.Handle("/queue/workflows/{id}/take", r.POST(api.postTakeWorkflowJobHandler, NeedWorker(), EnableTracing()))
334334
r.Handle("/queue/workflows/{id}/book", r.POST(api.postBookWorkflowJobHandler, NeedHatchery(), EnableTracing()), r.DELETE(api.deleteBookWorkflowJobHandler, NeedHatchery(), EnableTracing()))
335335
r.Handle("/queue/workflows/{id}/attempt", r.POST(api.postIncWorkflowJobAttemptHandler, NeedHatchery(), EnableTracing()))

engine/api/mon_db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (api *API) getMonDBTimesDBQueueWorkflow(ctx context.Context, r *http.Reques
109109
usr = nil
110110
}
111111

112-
if _, err := workflow.LoadNodeJobRunQueue(api.mustDB(), api.Cache, permissions, groupsID, usr, nil, nil, nil); err != nil {
112+
if _, err := workflow.LoadNodeJobRunQueue(ctx, api.mustDB(), api.Cache, permissions, groupsID, usr, nil, nil, nil); err != nil {
113113
return fmt.Sprintf("getMonDBTimesDBQueueWorkflow> Unable to load queue:: %s", err)
114114
}
115115
return elapsed("getMonDBTimesDBQueueWorkflow", s1)

engine/api/workflow/dao_node_run_job.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ func isSharedInfraGroup(groupsID []int64) bool {
2525
}
2626

2727
// CountNodeJobRunQueue count all workflow_node_run_job accessible
28-
func CountNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, groupsID []int64, usr *sdk.User, since *time.Time, until *time.Time, statuses ...string) (sdk.WorkflowNodeJobRunCount, error) {
28+
func CountNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, groupsID []int64, usr *sdk.User, since *time.Time, until *time.Time, statuses ...string) (sdk.WorkflowNodeJobRunCount, error) {
2929
c := sdk.WorkflowNodeJobRunCount{}
3030

31-
queue, err := LoadNodeJobRunQueue(db, store, permission.PermissionRead, groupsID, usr, since, until, nil, statuses...)
31+
queue, err := LoadNodeJobRunQueue(ctx, db, store, permission.PermissionRead, groupsID, usr, since, until, nil, statuses...)
3232
if err != nil {
3333
return c, sdk.WrapError(err, "CountNodeJobRunQueue> unable to load queue")
3434
}
@@ -44,7 +44,9 @@ func CountNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, groupsID []int
4444
}
4545

4646
// LoadNodeJobRunQueue load all workflow_node_run_job accessible
47-
func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, groupsID []int64, usr *sdk.User, since *time.Time, until *time.Time, limit *int, statuses ...string) ([]sdk.WorkflowNodeJobRun, error) {
47+
func LoadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, rights int, groupsID []int64, usr *sdk.User, since *time.Time, until *time.Time, limit *int, statuses ...string) ([]sdk.WorkflowNodeJobRun, error) {
48+
ctx, end := observability.Span(ctx, "LoadNodeJobRunQueue")
49+
defer end()
4850
if since == nil {
4951
since = new(time.Time)
5052
}
@@ -68,6 +70,7 @@ func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, gro
6870
args := []interface{}{*since, *until, strings.Join(statuses, ",")}
6971

7072
if usr != nil && !usr.Admin {
73+
observability.Current(ctx, observability.Tag("isAdmin", false))
7174
query = `
7275
SELECT DISTINCT workflow_node_run_job.*
7376
FROM workflow_node_run_job
@@ -97,6 +100,8 @@ func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, gro
97100
}
98101
}
99102
args = append(args, groupID, group.SharedInfraGroup.ID)
103+
} else {
104+
observability.Current(ctx, observability.Tag("isAdmin", true))
100105
}
101106

102107
if limit != nil && *limit > 0 {
@@ -106,13 +111,20 @@ func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, gro
106111

107112
isSharedInfraGroup := isSharedInfraGroup(groupsID)
108113
sqlJobs := []JobRun{}
114+
_, next := observability.Span(ctx, "LoadNodeJobRunQueue.select")
109115
if _, err := db.Select(&sqlJobs, query, args...); err != nil {
116+
next()
110117
return nil, sdk.WrapError(err, "workflow.LoadNodeJobRun> Unable to load job runs (Select)")
111118
}
119+
next()
120+
121+
ctx2, next2 := observability.Span(ctx, "LoadNodeJobRunQueue.sqlJobs")
112122

113123
jobs := make([]sdk.WorkflowNodeJobRun, 0, len(sqlJobs))
114124
for i := range sqlJobs {
125+
_, next3 := observability.Span(ctx2, "LoadNodeJobRunQueue.loadHatcheryInfo")
115126
getHatcheryInfo(store, &sqlJobs[i])
127+
next3()
116128
jr, err := sqlJobs[i].WorkflowNodeRunJob()
117129
if err != nil {
118130
log.Error("LoadNodeJobRunQueue> WorkflowNodeRunJob error: %v", err)
@@ -139,6 +151,7 @@ func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, gro
139151

140152
jobs = append(jobs, jr)
141153
}
154+
next2()
142155

143156
return jobs, nil
144157
}

engine/api/workflow/run_workflow_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func TestManualRun1(t *testing.T) {
2323
u, _ := assets.InsertAdminUser(db)
2424
key := sdk.RandomString(10)
2525
proj := assets.InsertTestProject(t, db, cache, key, key, u)
26+
ctx := context.Background()
2627

2728
//First pipeline
2829
pip := sdk.Pipeline{
@@ -123,7 +124,7 @@ func TestManualRun1(t *testing.T) {
123124
test.Equal(t, lastrun.WorkflowNodeRuns[w1.RootID][0], nodeRun)
124125

125126
//TestLoadNodeJobRun
126-
jobs, err := workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
127+
jobs, err := workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
127128
test.NoError(t, err)
128129
test.Equal(t, 2, len(jobs))
129130

@@ -150,6 +151,7 @@ func TestManualRun2(t *testing.T) {
150151
u, _ := assets.InsertAdminUser(db)
151152
key := sdk.RandomString(10)
152153
proj := assets.InsertTestProject(t, db, cache, key, key, u)
154+
ctx := context.Background()
153155

154156
//First pipeline
155157
pip := sdk.Pipeline{
@@ -236,7 +238,7 @@ func TestManualRun2(t *testing.T) {
236238
_, _, err = workflow.ManualRunFromNode(context.TODO(), db, cache, proj, w1, 1, &sdk.WorkflowNodeRunManual{User: *u}, w1.RootID)
237239
test.NoError(t, err)
238240

239-
jobs, err := workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
241+
jobs, err := workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
240242
test.NoError(t, err)
241243

242244
assert.Len(t, jobs, 3)
@@ -247,6 +249,7 @@ func TestManualRun3(t *testing.T) {
247249
u, _ := assets.InsertAdminUser(db)
248250
key := sdk.RandomString(10)
249251
proj := assets.InsertTestProject(t, db, cache, key, key, u)
252+
ctx := context.Background()
250253

251254
test.NoError(t, project.AddKeyPair(db, proj, "key", u))
252255

@@ -330,17 +333,17 @@ func TestManualRun3(t *testing.T) {
330333
test.NoError(t, err)
331334

332335
// test nil since/until
333-
_, err = workflow.CountNodeJobRunQueue(db, cache, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil)
336+
_, err = workflow.CountNodeJobRunQueue(ctx, db, cache, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil)
334337
test.NoError(t, err)
335338

336339
// queue should be empty with since 0,0 until 0,0
337340
t0 := time.Unix(0, 0)
338341
t1 := time.Unix(0, 0)
339-
countAlreadyInQueueNone, err := workflow.CountNodeJobRunQueue(db, cache, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1)
342+
countAlreadyInQueueNone, err := workflow.CountNodeJobRunQueue(ctx, db, cache, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1)
340343
test.NoError(t, err)
341344
assert.Equal(t, 0, int(countAlreadyInQueueNone.Count))
342345

343-
jobs, err := workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
346+
jobs, err := workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
344347
test.NoError(t, err)
345348

346349
for i := range jobs {
@@ -436,7 +439,7 @@ func TestManualRun3(t *testing.T) {
436439
tx.Commit()
437440
}
438441

439-
jobs, err = workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
442+
jobs, err = workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, nil, nil, nil)
440443
test.NoError(t, err)
441444
assert.Equal(t, 1, len(jobs))
442445

@@ -450,15 +453,15 @@ func TestManualRun3(t *testing.T) {
450453

451454
t0 := since.Add(-2 * time.Minute)
452455
t1 := since.Add(-1 * time.Minute)
453-
jobsSince, errW := workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1, nil)
456+
jobsSince, errW := workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1, nil)
454457
test.NoError(t, errW)
455458
for _, job := range jobsSince {
456459
if jobs[0].ID == job.ID {
457460
assert.Fail(t, " this job should not be in queue since/until")
458461
}
459462
}
460463

461-
jobsSince, errW = workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &since, nil, nil)
464+
jobsSince, errW = workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &since, nil, nil)
462465
test.NoError(t, errW)
463466
var found bool
464467
for _, job := range jobsSince {
@@ -472,7 +475,7 @@ func TestManualRun3(t *testing.T) {
472475

473476
t0 = since.Add(10 * time.Second)
474477
t1 = since.Add(15 * time.Second)
475-
jobsSince, errW = workflow.LoadNodeJobRunQueue(db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1, nil)
478+
jobsSince, errW = workflow.LoadNodeJobRunQueue(ctx, db, cache, permission.PermissionReadExecute, []int64{proj.ProjectGroups[0].Group.ID}, u, &t0, &t1, nil)
476479
test.NoError(t, errW)
477480
for _, job := range jobsSince {
478481
if jobs[0].ID == job.ID {

engine/api/workflow_queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ func (api *API) countWorkflowJobQueueHandler() service.Handler {
691691
usr = nil
692692
}
693693

694-
count, err := workflow.CountNodeJobRunQueue(api.mustDB(), api.Cache, groupsID, usr, &since, &until)
694+
count, err := workflow.CountNodeJobRunQueue(ctx, api.mustDB(), api.Cache, groupsID, usr, &since, &until)
695695
if err != nil {
696696
return sdk.WrapError(err, "countWorkflowJobQueueHandler> Unable to count queue")
697697
}
@@ -725,7 +725,7 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
725725
usr = nil
726726
}
727727

728-
jobs, err := workflow.LoadNodeJobRunQueue(api.mustDB(), api.Cache, permissions, groupsID, usr, &since, &until, &limit, status...)
728+
jobs, err := workflow.LoadNodeJobRunQueue(ctx, api.mustDB(), api.Cache, permissions, groupsID, usr, &since, &until, &limit, status...)
729729
if err != nil {
730730
return sdk.WrapError(err, "getWorkflowJobQueueHandler> Unable to load queue")
731731
}

0 commit comments

Comments
 (0)