Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions engine/hatchery/kubernetes/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ func NewHatcheryKubernetesTest(t *testing.T) *HatcheryKubernetes {
h.kubeClient = &kubernetesClient{clientSet}
gock.InterceptClient(clientSet.CoreV1().RESTClient().(*rest.RESTClient).Client)

h.Config.Name = "kyubi"
h.Config.Namespace = "hachibi"
h.Config.Name = "my-hatchery"
h.Config.Namespace = "cds-workers"
h.ServiceInstance = &sdk.Service{
CanonicalService: sdk.CanonicalService{
ID: 1,
Name: "kyubi",
Name: "my-hatchery",
},
}
return h
Expand Down
40 changes: 34 additions & 6 deletions engine/hatchery/kubernetes/kill_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"context"
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -17,20 +18,47 @@ import (
)

func (h *HatcheryKubernetes) killAwolWorkers(ctx context.Context) error {
pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_WORKER})
pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s", LABEL_HATCHERY_NAME, h.Config.Name, LABEL_WORKER_NAME),
})
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workers, err := h.CDSClient().WorkerList(ctx)
if err != nil {
return err
}

var globalErr error
for _, pod := range pods.Items {
annotations := pod.GetAnnotations()
labels := pod.GetLabels()
toDelete := false
for _, container := range pod.Status.ContainerStatuses {
if labels == nil {
continue
}

if (container.State.Terminated != nil && (container.State.Terminated.Reason == "Completed" || container.State.Terminated.Reason == "Error")) ||
(container.State.Waiting != nil && container.State.Waiting.Reason == "ErrImagePull") {
toDelete = true
var toDelete, found bool
for _, w := range workers {
if workerName, ok := labels[LABEL_WORKER_NAME]; ok && workerName == w.Name {
found = true
break
}
}
if !found {
toDelete = true
}

if !toDelete {
for _, container := range pod.Status.ContainerStatuses {
terminated := (container.State.Terminated != nil && (container.State.Terminated.Reason == "Completed" || container.State.Terminated.Reason == "Error"))
errImagePull := (container.State.Waiting != nil && container.State.Waiting.Reason == "ErrImagePull")
if terminated || errImagePull {
toDelete = true
break
}
}
}

Expand Down
57 changes: 45 additions & 12 deletions engine/hatchery/kubernetes/kill_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"gopkg.in/h2non/gock.v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ovh/cds/sdk"
)

func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) {
Expand All @@ -19,8 +21,12 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) {
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
Namespace: "kyubi",
Name: "worker-1",
Namespace: "cds-workers",
Labels: map[string]string{
LABEL_HATCHERY_NAME: "my-hatchery",
LABEL_WORKER_NAME: "worker-1",
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -36,15 +42,23 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) {
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "wrong",
Namespace: "kyubi",
Name: "worker-2",
Namespace: "cds-workers",
Labels: map[string]string{
LABEL_HATCHERY_NAME: "my-hatchery",
LABEL_WORKER_NAME: "worker-2",
},
},
Spec: v1.PodSpec{},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
Namespace: "kyubi",
Name: "worker-3",
Namespace: "cds-workers",
Labels: map[string]string{
LABEL_HATCHERY_NAME: "my-hatchery",
LABEL_WORKER_NAME: "worker-3",
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -60,8 +74,12 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) {
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "w3",
Namespace: "kyubi",
Name: "worker-4",
Namespace: "cds-workers",
Labels: map[string]string{
LABEL_HATCHERY_NAME: "my-hatchery",
LABEL_WORKER_NAME: "worker-4",
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -75,13 +93,28 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) {
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "worker-5",
Namespace: "cds-workers",
Labels: map[string]string{
LABEL_HATCHERY_NAME: "my-hatchery",
LABEL_WORKER_NAME: "worker-5",
},
},
},
},
}
gock.New("http://lolcat.kube").Get("/api/v1/namespaces/hachibi/pods").Reply(http.StatusOK).JSON(podsList)
gock.New("http://lolcat.kube").Get("/api/v1/namespaces/cds-workers/pods").Reply(http.StatusOK).JSON(podsList)

gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-1").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-2").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-3").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-4").Reply(http.StatusOK).JSON(nil)

gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/kyubi/pods/w1").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/kyubi/pods/w2").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/kyubi/pods/w3").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.api").Get("/worker").Reply(http.StatusOK).JSON([]sdk.Worker{{
Name: "worker-5",
}})

err := h.killAwolWorkers(context.TODO())
require.NoError(t, err)
Expand Down
87 changes: 51 additions & 36 deletions engine/hatchery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient"
"github.com/ovh/cds/sdk/hatchery"
"github.com/ovh/cds/sdk/slug"
)

// New instanciates a new hatchery local
Expand Down Expand Up @@ -99,8 +100,12 @@ func (h *HatcheryKubernetes) ApplyConfiguration(cfg interface{}) error {
// Status returns sdk.MonitoringStatus, implements interface service.Service
func (h *HatcheryKubernetes) Status(ctx context.Context) *sdk.MonitoringStatus {
m := h.NewMonitoringStatus()
m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(h.WorkersStarted(ctx)), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK})

ws, err := h.WorkersStarted(ctx)
if err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Warn(ctx, err.Error())
}
m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(ws), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK})
return m
}

Expand Down Expand Up @@ -171,11 +176,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
return sdk.WithStack(fmt.Errorf("no job ID and no register"))
}

label := "execution"
if spawnArgs.RegisterOnly {
label = "register"
}

var logJob string
if spawnArgs.JobID > 0 {
logJob = fmt.Sprintf("for workflow job %d,", spawnArgs.JobID)
Expand Down Expand Up @@ -221,7 +221,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
envsWm := workerConfig.InjectEnvVars
envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory)
envsWm["CDS_FROM_WORKER_IMAGE"] = "true"
envsWm["CDS_CONFIG"] = workerConfig.EncodeBase64()

for envName, envValue := range spawnArgs.Model.ModelDocker.Envs {
envsWm[envName] = envValue
Expand All @@ -239,16 +238,33 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
pullPolicy = "Always"
}

// Create secret for worker config
configSecretName, err := h.createConfigSecret(ctx, workerConfig)
if err != nil {
return sdk.WrapError(err, "cannot create secret for config %s", workerConfig.Name)
}
envs = append(envs, apiv1.EnvVar{
Name: "CDS_CONFIG",
ValueFrom: &apiv1.EnvVarSource{
SecretKeyRef: &apiv1.SecretKeySelector{
LocalObjectReference: apiv1.LocalObjectReference{
Name: configSecretName,
},
Key: "CDS_CONFIG",
},
},
})

var gracePeriodSecs int64
podSchema := apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: spawnArgs.WorkerName,
Namespace: h.Config.Namespace,
DeletionGracePeriodSeconds: &gracePeriodSecs,
Labels: map[string]string{
LABEL_WORKER: label,
LABEL_WORKER_MODEL: strings.ToLower(spawnArgs.Model.Name),
LABEL_HATCHERY_NAME: h.Configuration().Name,
LABEL_HATCHERY_NAME: h.Configuration().Name,
LABEL_WORKER_NAME: workerConfig.Name,
LABEL_WORKER_MODEL_PATH: slug.Convert(spawnArgs.Model.Path()),
},
Annotations: map[string]string{},
},
Expand All @@ -273,6 +289,15 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
},
}

// Check here to add secret if needed
if spawnArgs.Model.ModelDocker.Private {
secretRegistryName, err := h.createRegistrySecret(ctx, *spawnArgs.Model)
if err != nil {
return sdk.WrapError(err, "cannot create secret for model %s", spawnArgs.Model.Path())
}
podSchema.Spec.ImagePullSecrets = []apiv1.LocalObjectReference{{Name: secretRegistryName}}
}

var services []sdk.Requirement
for _, req := range spawnArgs.Requirements {
if req.Type == sdk.ServiceRequirement {
Expand All @@ -286,16 +311,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
podSchema.Spec.HostAliases[0].Hostnames[0] = "worker"
}

// Check here to add secret if needed
secretName := "cds-credreg-" + spawnArgs.Model.Name
if spawnArgs.Model.ModelDocker.Private {
if err := h.createSecret(ctx, secretName, *spawnArgs.Model); err != nil {
return sdk.WrapError(err, "cannot create secret for model %s", spawnArgs.Model.Path())
}
podSchema.Spec.ImagePullSecrets = []apiv1.LocalObjectReference{{Name: secretName}}
podSchema.ObjectMeta.Labels[LABEL_SECRET] = secretName
}

for i, serv := range services {
//name= <alias> => the name of the host put in /etc/hosts of the worker
//value= "postgres:latest env_1=blabla env_2=blabla"" => we can add env variables in requirement name
Expand Down Expand Up @@ -345,7 +360,7 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
podSchema.Spec.HostAliases[0].Hostnames[i+1] = strings.ToLower(serv.Name)
}

_, err := h.kubeClient.PodCreate(ctx, h.Config.Namespace, &podSchema, metav1.CreateOptions{})
_, err = h.kubeClient.PodCreate(ctx, h.Config.Namespace, &podSchema, metav1.CreateOptions{})
log.Debug(ctx, "hatchery> kubernetes> SpawnWorker> %s > Pod created", spawnArgs.WorkerName)
return sdk.WithStack(err)
}
Expand All @@ -356,20 +371,18 @@ func (h *HatcheryKubernetes) GetLogger() *logrus.Logger {

// WorkersStarted returns the number of instances started but
// not necessarily register on CDS yet
func (h *HatcheryKubernetes) WorkersStarted(ctx context.Context) []string {
list, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_HATCHERY_NAME})
func (h *HatcheryKubernetes) WorkersStarted(ctx context.Context) ([]string, error) {
list, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s", LABEL_HATCHERY_NAME, h.Config.Name, LABEL_WORKER_NAME),
})
if err != nil {
log.Warn(ctx, "WorkersStarted> unable to list pods on namespace %s", h.Config.Namespace)
return nil
return nil, sdk.WrapError(err, "unable to list pods on namespace %s", h.Config.Namespace)
}
workerNames := make([]string, 0, list.Size())
for _, pod := range list.Items {
labels := pod.GetLabels()
if labels[LABEL_HATCHERY_NAME] == h.Configuration().Name {
workerNames = append(workerNames, pod.GetName())
}
workerNames = append(workerNames, pod.GetName())
}
return workerNames
return workerNames, nil
}

// NeedRegistration return true if worker model need regsitration
Expand All @@ -381,31 +394,33 @@ func (h *HatcheryKubernetes) NeedRegistration(_ context.Context, m *sdk.Model) b
}

func (h *HatcheryKubernetes) routines(ctx context.Context) {
ticker := time.NewTicker(10 * time.Minute)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
h.GoRoutines.Exec(ctx, "getCDNConfiguration", func(ctx context.Context) {
if err := h.Common.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "hatchery> kubernetes> cannot get cdn configuration : %v", err)
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot get cdn configuration"))
}
})

h.GoRoutines.Exec(ctx, "getServicesLogs", func(ctx context.Context) {
if err := h.getServicesLogs(ctx); err != nil {
log.Error(ctx, "Hatchery> Kubernetes> Cannot get service logs : %v", err)
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot get service logs"))
}
})

h.GoRoutines.Exec(ctx, "killAwolWorker", func(ctx context.Context) {
_ = h.killAwolWorkers(ctx)
if err := h.killAwolWorkers(ctx); err != nil {
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot delete awol worker"))
}
})

h.GoRoutines.Exec(ctx, "deleteSecrets", func(ctx context.Context) {
if err := h.deleteSecrets(ctx); err != nil {
log.Error(ctx, "hatchery> kubernetes> cannot handle secrets : %v", err)
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot delete secrets"))
}
})
case <-ctx.Done():
Expand Down
Loading