Skip to content

Commit 74d7252

Browse files
bnjjjyesnault
authored andcommitted
feat(worker): provisioned worker should not take when memory requirements (#4084)
Signed-off-by: Benjamin Coenen <[email protected]>
1 parent 0734574 commit 74d7252

File tree

7 files changed

+40
-30
lines changed

7 files changed

+40
-30
lines changed

docs/content/docs/concepts/requirement/_index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ Type of requirements:
1313
- Hostname
1414
- [Network access]({{< relref "/docs/concepts/requirement/requirement_network.md" >}})
1515
- [Service]({{< relref "/docs/concepts/requirement/requirement_service.md" >}})
16-
- Memory
16+
- [Memory]({{< relref "/docs/concepts/requirement/requirement_memory.md" >}})
1717
- [OS & Architecture]({{< relref "/docs/concepts/requirement/requirement_os_arch.md" >}})
1818

1919
A [Job]({{< relref "/docs/concepts/job.md" >}}) will be executed by a **worker**.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
title: "Memory"
3+
weight: 6
4+
---
5+
6+
The Memory requirement allows you to require a worker to have a specific number of mb of RAM.
7+
8+
For example if you need 2Gb of RAM for your worker you can put `2048` in your memory requirement.

engine/hatchery/kubernetes/kubernetes.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import (
1515
"k8s.io/apimachinery/pkg/api/resource"
1616
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1717
"k8s.io/client-go/kubernetes"
18+
_ "k8s.io/client-go/plugin/pkg/client/auth"
1819
"k8s.io/client-go/rest"
1920
"k8s.io/client-go/tools/clientcmd"
20-
_ "k8s.io/client-go/plugin/pkg/client/auth"
2121
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
2222

2323
"github.com/ovh/cds/engine/api"
@@ -233,6 +233,10 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
233233
label = "register"
234234
}
235235

236+
// Kubernetes pod name must not be > 63 chars
237+
if len(name) > 63 {
238+
name = name[:60]
239+
}
236240
log.Debug("hatchery> kubernetes> SpawnWorker> %s", name)
237241

238242
var logJob string
@@ -290,6 +294,7 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
290294
}
291295
envsWm := map[string]string{}
292296
envsWm["CDS_FORCE_EXIT"] = "1"
297+
envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory)
293298
envsWm["CDS_API"] = udataParam.API
294299
envsWm["CDS_TOKEN"] = udataParam.Token
295300
envsWm["CDS_NAME"] = udataParam.Name

engine/hatchery/marathon/marathon.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
285285

286286
envsWm := map[string]string{}
287287
envsWm["CDS_FORCE_EXIT"] = "0"
288+
envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory)
288289
envsWm["CDS_API"] = udataParam.API
289290
envsWm["CDS_TOKEN"] = udataParam.Token
290291
envsWm["CDS_NAME"] = udataParam.Name

engine/hatchery/swarm/swarm.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,7 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw
380380

381381
envsWm := map[string]string{}
382382
envsWm["CDS_FORCE_EXIT"] = "1"
383+
envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory)
383384
envsWm["CDS_API"] = udataParam.API
384385
envsWm["CDS_TOKEN"] = udataParam.Token
385386
envsWm["CDS_NAME"] = udataParam.Name

engine/worker/cmd_run.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
229229
var requirementsOK, pluginsOK bool
230230
var t string
231231
if exceptJobID != j.ID && w.bookedWJobID == 0 { // If we already check the requirements before and it was OK
232-
requirementsOK, _ = checkRequirements(w, &j.Job.Action, nil, j.ID)
232+
requirementsOK, _ = checkRequirements(w, &j.Job.Action, j.ID)
233233
if j.ID == w.bookedWJobID {
234234
t = ", this was my booked job"
235235
}
@@ -304,7 +304,7 @@ func (w *currentWorker) processBookedWJob(ctx context.Context, wjobs chan<- sdk.
304304
return sdk.WrapError(err, "Unable to load workflow node job %d", w.bookedWJobID)
305305
}
306306

307-
requirementsOK, errRequirements := checkRequirements(w, &wjob.Job.Action, nil, wjob.ID)
307+
requirementsOK, errRequirements := checkRequirements(w, &wjob.Job.Action, wjob.ID)
308308
if !requirementsOK {
309309
var details string
310310
for _, r := range errRequirements {

engine/worker/requirement.go

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,31 +28,12 @@ var requirementCheckFuncs = map[string]func(w *currentWorker, r sdk.Requirement)
2828
sdk.OSArchRequirement: checkOSArchRequirement,
2929
}
3030

31-
func checkRequirements(w *currentWorker, a *sdk.Action, execGroups []sdk.Group, bookedJobID int64) (bool, []sdk.Requirement) {
31+
func checkRequirements(w *currentWorker, a *sdk.Action, bookedJobID int64) (bool, []sdk.Requirement) {
3232
requirementsOK := true
3333
errRequirements := []sdk.Requirement{}
3434

3535
log.Debug("checkRequirements> for JobID:%d model of worker: %s", bookedJobID, w.model.Name)
3636

37-
// DEPRECATED
38-
// this code is useful for pipelineBuildJob
39-
// with CDS Workflows, the queue contains only jobs executable by worker
40-
// after removing pbBuildJob, check execGroups here can be removed
41-
if execGroups != nil && len(execGroups) > 0 && w.model.GroupID > 0 {
42-
checkGroup := false
43-
for _, g := range execGroups {
44-
if g.ID == w.model.GroupID {
45-
checkGroup = true
46-
break
47-
}
48-
}
49-
if !checkGroup {
50-
requirementsOK = false
51-
log.Debug("checkRequirements> model %s attached to group %d can't run this job", w.model.Name, w.model.GroupID)
52-
return requirementsOK, nil
53-
}
54-
}
55-
5637
log.Debug("requirements for %s >>> %+v\n", a.Name, a.Requirements)
5738
for _, r := range a.Requirements {
5839
ok, err := checkRequirement(w, r)
@@ -177,19 +158,33 @@ func checkServiceRequirement(w *currentWorker, r sdk.Requirement) (bool, error)
177158
}
178159

179160
func checkMemoryRequirement(w *currentWorker, r sdk.Requirement) (bool, error) {
180-
v, err := mem.VirtualMemory()
161+
var totalMemory int64
162+
neededMemory, err := strconv.ParseInt(r.Value, 10, 64)
181163
if err != nil {
182164
return false, err
183165
}
184-
totalMemory := v.Total
185166

186-
neededMemory, err := strconv.ParseInt(r.Value, 10, 64)
187-
if err != nil {
188-
return false, err
167+
switch w.model.Type {
168+
// Check env variables in a docker is safer than mem.VirtualMemory
169+
case sdk.Docker:
170+
var err error
171+
// Useful for provisioned worker
172+
memoryEnv := os.Getenv("CDS_MODEL_MEMORY")
173+
totalMemory, err = strconv.ParseInt(memoryEnv, 10, 64)
174+
if err != nil {
175+
return false, err
176+
}
177+
totalMemory = totalMemory * 1024 * 1024
178+
default:
179+
v, err := mem.VirtualMemory()
180+
if err != nil {
181+
return false, err
182+
}
183+
totalMemory = int64(v.Total)
189184
}
190185
//Assuming memory is in megabytes
191186
//If we have more than 90% of neededMemory, lets do it
192-
return int64(totalMemory) >= (neededMemory*1024*1024)*90/100, nil
187+
return totalMemory >= (neededMemory*1024*1024)*90/100, nil
193188
}
194189

195190
func checkVolumeRequirement(w *currentWorker, r sdk.Requirement) (bool, error) {

0 commit comments

Comments
 (0)