Skip to content

Commit 125b85b

Browse files
fsaminbnjjj
authored andcommitted
feat(sdk): order queue by project instance (#2983)
* feat(sdk): order queue by project instance * fix(api): order workflow queue
1 parent ed99a4a commit 125b85b

File tree

7 files changed

+92
-3
lines changed

7 files changed

+92
-3
lines changed

engine/api/workflow/dao_node_run_job.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, gro
6262
from workflow_node_run_job
6363
where workflow_node_run_job.queued >= $1
6464
and workflow_node_run_job.queued <= $2
65-
and workflow_node_run_job.status = ANY(string_to_array($3, ','))`
65+
and workflow_node_run_job.status = ANY(string_to_array($3, ','))
66+
order by workflow_node_run_job.queued ASC`
6667

6768
args := []interface{}{*since, *until, strings.Join(statuses, ",")}
6869

@@ -84,6 +85,7 @@ func LoadNodeJobRunQueue(db gorp.SqlExecutor, store cache.Store, rights int, gro
8485
AND workflow_node_run_job.queued >= $1
8586
AND workflow_node_run_job.queued <= $2
8687
AND workflow_node_run_job.status = ANY(string_to_array($3, ','))
88+
ORDER BY workflow_node_run_job.queued ASC
8789
`
8890

8991
var groupID string

engine/api/workflow/execute_node_run.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, stage *sdk.Stage,
361361

362362
//Create the job run
363363
wjob := sdk.WorkflowNodeJobRun{
364+
ProjectID: wr.ProjectID,
364365
WorkflowNodeRunID: run.ID,
365366
Start: time.Time{},
366367
Queued: time.Now(),
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
-- +migrate Up
2+
ALTER TABLE workflow_node_run_job ADD COLUMN project_id BIGINT DEFAULT 0;
3+
UPDATE workflow_node_run_job SET project_id = 0;
4+
5+
-- +migrate Down
6+
ALTER TABLE workflow_node_run_job DROP COLUMN project_id;

sdk/cdsclient/client_queue.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ
4545
}
4646

4747
if jobs != nil {
48-
queue := []sdk.WorkflowNodeJobRun{}
48+
queue := sdk.WorkflowQueue{}
4949
if _, err := c.GetJSON("/queue/workflows", &queue); err != nil {
5050
errs <- sdk.WrapError(err, "Unable to load old jobs")
5151
}
52+
queue.Sort()
5253
for _, j := range queue {
5354
jobs <- j
5455
}

sdk/workflow_run.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"html/template"
77
"net/url"
8+
"sort"
89
"strings"
910
"time"
1011

@@ -199,6 +200,7 @@ func (w WorkflowNodeRunArtifact) Equal(c WorkflowNodeRunArtifact) bool {
199200
//WorkflowNodeJobRun represents an job to be run
200201
//easyjson:json
201202
type WorkflowNodeJobRun struct {
203+
ProjectID int64 `json:"project_id" db:"project_id"`
202204
ID int64 `json:"id" db:"id"`
203205
WorkflowNodeRunID int64 `json:"workflow_node_run_id,omitempty" db:"workflow_node_run_id"`
204206
Job ExecutedJob `json:"job" db:"-"`
@@ -323,3 +325,22 @@ func (nr WorkflowNodeRun) Report() (string, error) {
323325
errE := t.Execute(out, nr)
324326
return out.String(), errE
325327
}
328+
329+
type WorkflowQueue []WorkflowNodeJobRun
330+
331+
func (q WorkflowQueue) Sort() {
332+
//Count the number of WorkflowNodeJobRun per project_id
333+
n := make(map[int64]int, len(q))
334+
for _, j := range q {
335+
nb := n[j.ProjectID]
336+
nb++
337+
n[j.ProjectID] = nb
338+
}
339+
340+
sort.Slice(q, func(i, j int) bool {
341+
p1 := n[q[i].ProjectID]
342+
p2 := n[q[j].ProjectID]
343+
return p1 < p2
344+
})
345+
346+
}

sdk/workflow_run_easyjson.go

Lines changed: 13 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/workflow_run_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,49 @@ func TestWorkflowRunReport(t *testing.T) {
105105
assert.NoError(t, err)
106106
t.Log(s)
107107
}
108+
109+
func TestWorkflowQueue_Sort(t *testing.T) {
110+
tests := []struct {
111+
name string
112+
q WorkflowQueue
113+
expected WorkflowQueue
114+
}{
115+
{
116+
name: "test sort 1",
117+
q: WorkflowQueue{
118+
{
119+
ProjectID: 1,
120+
ID: 1,
121+
},
122+
{
123+
ProjectID: 1,
124+
ID: 2,
125+
},
126+
{
127+
ProjectID: 2,
128+
ID: 3,
129+
},
130+
},
131+
expected: WorkflowQueue{
132+
{
133+
ProjectID: 2,
134+
ID: 3,
135+
},
136+
{
137+
ProjectID: 1,
138+
ID: 1,
139+
},
140+
{
141+
ProjectID: 1,
142+
ID: 2,
143+
},
144+
},
145+
},
146+
}
147+
for _, tt := range tests {
148+
t.Run(tt.name, func(t *testing.T) {
149+
tt.q.Sort()
150+
assert.Equal(t, tt.expected, tt.q)
151+
})
152+
}
153+
}

0 commit comments

Comments
 (0)