Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## main / unreleased

* [CHANGE] Rename metric `rollout_operator_request_invalid_cluster_validation_labels_total` to `rollout_operator_client_invalid_cluster_validation_label_requests_total`. #217
* [BUGFIX] Use a StatefulSet's `.spec.serviceName` when constructing the prepare-downscale endpoint for a pod. #221

## v0.26.0

Expand Down
107 changes: 57 additions & 50 deletions pkg/admission/prep_downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,18 @@ func prepareDownscale(ctx context.Context, l log.Logger, ar admissionv1.Admissio
return response
}

// Get the labels and annotations from the old object including the prepare downscale label
lbls, annotations, err := getLabelsAndAnnotations(ctx, ar, api, oldInfo)
stsPrepareInfo, err := getStatefulSetPrepareInfo(ctx, ar, api, oldInfo)
if err != nil {
return allowWarn(logger, fmt.Sprintf("%s, allowing the change", err))
}

// Since it's a downscale, check if the resource has the label that indicates it needs to be prepared to be downscaled.
if lbls[config.PrepareDownscaleLabelKey] != config.PrepareDownscaleLabelValue {
// Not labeled, nothing to do.
// Since it's a downscale, check if the resource needs to be prepared to be downscaled.
if !stsPrepareInfo.prepareDownscale {
// Nothing to do.
return &admissionv1.AdmissionResponse{Allowed: true}
}

port := annotations[config.PrepareDownscalePortAnnotationKey]
if port == "" {
if stsPrepareInfo.port == "" {
level.Warn(logger).Log("msg", fmt.Sprintf("downscale not allowed because the %v annotation is not set or empty", config.PrepareDownscalePortAnnotationKey))
return deny(
fmt.Sprintf(
Expand All @@ -109,8 +107,7 @@ func prepareDownscale(ctx context.Context, l log.Logger, ar admissionv1.Admissio
)
}

path := annotations[config.PrepareDownscalePathAnnotationKey]
if path == "" {
if stsPrepareInfo.path == "" {
level.Warn(logger).Log("msg", fmt.Sprintf("downscale not allowed because the %v annotation is not set or empty", config.PrepareDownscalePathAnnotationKey))
return deny(
fmt.Sprintf(
Expand All @@ -120,9 +117,18 @@ func prepareDownscale(ctx context.Context, l log.Logger, ar admissionv1.Admissio
)
}

rolloutGroup := lbls[config.RolloutGroupLabelKey]
if rolloutGroup != "" {
stsList, err := findStatefulSetsForRolloutGroup(ctx, api, ar.Request.Namespace, rolloutGroup)
if stsPrepareInfo.serviceName == "" {
level.Warn(logger).Log("msg", "downscale not allowed because the serviceName is not set or empty")
return deny(
fmt.Sprintf(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because the serviceName is not set or empty.",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldInfo.replicas, *newInfo.replicas,
),
)
}

if stsPrepareInfo.rolloutGroup != "" {
stsList, err := findStatefulSetsForRolloutGroup(ctx, api, ar.Request.Namespace, stsPrepareInfo.rolloutGroup)
if err != nil {
level.Warn(logger).Log("msg", "downscale not allowed due to error while finding other statefulsets", "err", err)
return deny(
Expand Down Expand Up @@ -164,7 +170,7 @@ func prepareDownscale(ctx context.Context, l log.Logger, ar admissionv1.Admissio
}

// It's a downscale, so we need to prepare the pods that are going away for shutdown.
eps := createEndpoints(ar, oldInfo, newInfo, port, path)
eps := createEndpoints(ar, oldInfo, newInfo, stsPrepareInfo.port, stsPrepareInfo.path, stsPrepareInfo.serviceName)

if err := sendPrepareShutdownRequests(ctx, logger, client, eps); err != nil {
// Down-scale operation is disallowed because at least one pod failed to
Expand Down Expand Up @@ -206,6 +212,38 @@ func prepareDownscale(ctx context.Context, l log.Logger, ar admissionv1.Admissio
}
}

type statefulSetPrepareInfo struct {
prepareDownscale bool
port string
path string
rolloutGroup string
serviceName string
}

func getStatefulSetPrepareInfo(ctx context.Context, ar admissionv1.AdmissionReview, api kubernetes.Interface, info *objectInfo) (*statefulSetPrepareInfo, error) {
var sts *appsv1.StatefulSet
switch o := info.obj.(type) {
case *appsv1.StatefulSet:
sts = o
case *autoscalingv1.Scale:
var err error
sts, err = getStatefulSet(ctx, ar, api)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported type %s (go type %T)", info.gvk, info.obj)
}

return &statefulSetPrepareInfo{
prepareDownscale: sts.Labels[config.PrepareDownscaleLabelKey] == config.PrepareDownscaleLabelValue,
port: sts.Annotations[config.PrepareDownscalePortAnnotationKey],
path: sts.Annotations[config.PrepareDownscalePathAnnotationKey],
rolloutGroup: sts.Labels[config.RolloutGroupLabelKey],
serviceName: sts.Spec.ServiceName,
}, nil
}

// deny returns a *v1.AdmissionResponse with Allowed: false and the message provided
func deny(msg string) *admissionv1.AdmissionResponse {
return &admissionv1.AdmissionResponse{
Expand All @@ -216,8 +254,8 @@ func deny(msg string) *admissionv1.AdmissionResponse {
}
}

func getResourceAnnotations(ctx context.Context, ar admissionv1.AdmissionReview, api kubernetes.Interface) (map[string]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "admission.getResourceAnnotations()")
func getStatefulSet(ctx context.Context, ar admissionv1.AdmissionReview, api kubernetes.Interface) (*appsv1.StatefulSet, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "admission.getStatefulSet()")
defer span.Finish()

span.SetTag("object.namespace", ar.Request.Namespace)
Expand All @@ -229,7 +267,7 @@ func getResourceAnnotations(ctx context.Context, ar admissionv1.AdmissionReview,
if err != nil {
return nil, err
}
return obj.Annotations, nil
return obj, nil
}
return nil, fmt.Errorf("unsupported resource %s", ar.Request.Resource.Resource)
}
Expand Down Expand Up @@ -428,51 +466,20 @@ func checkReplicasChange(logger log.Logger, oldInfo, newInfo *objectInfo) *admis
return nil
}

func getLabelsAndAnnotations(ctx context.Context, ar admissionv1.AdmissionReview, api kubernetes.Interface, info *objectInfo) (map[string]string, map[string]string, error) {
var lbls, annotations map[string]string
var err error

switch o := info.obj.(type) {
case *appsv1.Deployment:
lbls = o.Labels
annotations = o.Annotations
case *appsv1.StatefulSet:
lbls = o.Labels
annotations = o.Annotations
case *appsv1.ReplicaSet:
lbls = o.Labels
annotations = o.Annotations
case *autoscalingv1.Scale:
lbls, err = getResourceLabels(ctx, ar, api)
if err != nil {
return nil, nil, err
}
annotations, err = getResourceAnnotations(ctx, ar, api)
if err != nil {
return nil, nil, err
}
default:
return nil, nil, fmt.Errorf("unsupported type %T", o)
}

return lbls, annotations, nil
}

func createEndpoints(ar admissionv1.AdmissionReview, oldInfo, newInfo *objectInfo, port, path string) []endpoint {
func createEndpoints(ar admissionv1.AdmissionReview, oldInfo, newInfo *objectInfo, port, path, serviceName string) []endpoint {
diff := (*oldInfo.replicas - *newInfo.replicas)
eps := make([]endpoint, diff)

// The DNS entry for a pod of a stateful set is
// ingester-zone-a-0.$(servicename).$(namespace).svc.cluster.local
// The service in this case is ingester-zone-a as well.
// https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#stable-network-id

for i := 0; i < int(diff); i++ {
for i := range int(diff) {
index := int(*oldInfo.replicas) - i - 1 // nr in statefulset
eps[i].url = fmt.Sprintf("%v-%v.%v.%v.svc.cluster.local:%s/%s",
ar.Request.Name, // pod name
index,
ar.Request.Name, // svc name
serviceName,
ar.Request.Namespace,
port,
path,
Expand Down
72 changes: 41 additions & 31 deletions pkg/admission/prep_downscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,46 +873,54 @@ func TestCheckReplicasChange(t *testing.T) {
}
}

func TestGetLabelsAndAnnotations(t *testing.T) {
func TestGetStatefulSetPrepareInfo(t *testing.T) {
ctx := context.Background()
ar := admissionv1.AdmissionReview{}
api := fake.NewSimpleClientset()

tests := []struct {
name string
info *objectInfo
expectedLbl map[string]string
expectedAnn map[string]string
expectErr bool
name string
info *objectInfo
expectInfo *statefulSetPrepareInfo
expectErr bool
}{
{
name: "Deployment",
name: "StatefulSet",
info: &objectInfo{
obj: &appsv1.Deployment{
obj: &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"label1": "value1"},
Annotations: map[string]string{"annotation1": "value1"},
Labels: map[string]string{
config.PrepareDownscaleLabelKey: config.PrepareDownscaleLabelValue,
},
Annotations: map[string]string{
config.PrepareDownscalePathAnnotationKey: "path",
config.PrepareDownscalePortAnnotationKey: "port",
},
},
Spec: appsv1.StatefulSetSpec{
ServiceName: "serviceName",
},
},
},
expectedLbl: map[string]string{"label1": "value1"},
expectedAnn: map[string]string{"annotation1": "value1"},
expectErr: false,
expectInfo: &statefulSetPrepareInfo{
prepareDownscale: true,
path: "path",
port: "port",
serviceName: "serviceName",
},
expectErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lbls, anns, err := getLabelsAndAnnotations(ctx, ar, api, tt.info)
stsPrepareInfo, err := getStatefulSetPrepareInfo(ctx, ar, api, tt.info)
if (err != nil) != tt.expectErr {
t.Errorf("getLabelsAndAnnotations() error = %v, expectErr %v", err, tt.expectErr)
t.Errorf("getStatefulSetPrepareInfo() error = %v, expectErr %v", err, tt.expectErr)
return
}
if !reflect.DeepEqual(lbls, tt.expectedLbl) {
t.Errorf("getLabelsAndAnnotations() labels = %v, want %v", lbls, tt.expectedLbl)
}
if !reflect.DeepEqual(anns, tt.expectedAnn) {
t.Errorf("getLabelsAndAnnotations() annotations = %v, want %v", anns, tt.expectedAnn)
if !reflect.DeepEqual(stsPrepareInfo, tt.expectInfo) {
t.Errorf("getStatefulSetPrepareInfo() stsPrepareInfo= %v, want %v", stsPrepareInfo, tt.expectInfo)
}
})
}
Expand All @@ -927,12 +935,13 @@ func TestCreateEndpoints(t *testing.T) {
}

tests := []struct {
name string
oldInfo *objectInfo
newInfo *objectInfo
port string
path string
expected []endpoint
name string
oldInfo *objectInfo
newInfo *objectInfo
port string
path string
serviceName string
expected []endpoint
}{
{
name: "downscale by 2",
Expand All @@ -942,15 +951,16 @@ func TestCreateEndpoints(t *testing.T) {
newInfo: &objectInfo{
replicas: func() *int32 { i := int32(3); return &i }(),
},
port: "8080",
path: "prepare-downscale",
port: "8080",
path: "prepare-downscale",
serviceName: "service-name",
expected: []endpoint{
{
url: "test-4.test.default.svc.cluster.local:8080/prepare-downscale",
url: "test-4.service-name.default.svc.cluster.local:8080/prepare-downscale",
index: 4,
},
{
url: "test-3.test.default.svc.cluster.local:8080/prepare-downscale",
url: "test-3.service-name.default.svc.cluster.local:8080/prepare-downscale",
index: 3,
},
},
Expand All @@ -959,7 +969,7 @@ func TestCreateEndpoints(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := createEndpoints(ar, tt.oldInfo, tt.newInfo, tt.port, tt.path)
actual := createEndpoints(ar, tt.oldInfo, tt.newInfo, tt.port, tt.path, tt.serviceName)
if len(actual) != len(tt.expected) {
t.Errorf("createEndpoints() = %v, want %v", actual, tt.expected)
return
Expand Down
30 changes: 18 additions & 12 deletions pkg/admission/zone_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,17 @@ func (zt *zoneTracker) prepareDownscale(ctx context.Context, l log.Logger, ar ad
return response
}

// Get the labels and annotations from the old object including the prepare downscale label
lbls, annotations, err := getLabelsAndAnnotations(ctx, ar, api, oldInfo)
stsPrepareInfo, err := getStatefulSetPrepareInfo(ctx, ar, api, oldInfo)
if err != nil {
return allowWarn(logger, fmt.Sprintf("%s, allowing the change", err))
}

if lbls[config.PrepareDownscaleLabelKey] != config.PrepareDownscaleLabelValue {
// Not labeled, nothing to do.
if !stsPrepareInfo.prepareDownscale {
// Nothing to do.
return &admissionv1.AdmissionResponse{Allowed: true}
}

port := annotations[config.PrepareDownscalePortAnnotationKey]
if port == "" {
if stsPrepareInfo.port == "" {
level.Warn(logger).Log("msg", fmt.Sprintf("downscale not allowed because the %v annotation is not set or empty", config.PrepareDownscalePortAnnotationKey))
return deny(
fmt.Sprintf(
Expand All @@ -89,8 +87,7 @@ func (zt *zoneTracker) prepareDownscale(ctx context.Context, l log.Logger, ar ad
)
}

path := annotations[config.PrepareDownscalePathAnnotationKey]
if path == "" {
if stsPrepareInfo.path == "" {
level.Warn(logger).Log("msg", fmt.Sprintf("downscale not allowed because the %v annotation is not set or empty", config.PrepareDownscalePathAnnotationKey))
return deny(
fmt.Sprintf(
Expand All @@ -100,9 +97,18 @@ func (zt *zoneTracker) prepareDownscale(ctx context.Context, l log.Logger, ar ad
)
}

rolloutGroup := lbls[config.RolloutGroupLabelKey]
if rolloutGroup != "" {
stsList, err := findStatefulSetsForRolloutGroup(ctx, api, ar.Request.Namespace, rolloutGroup)
if stsPrepareInfo.serviceName == "" {
level.Warn(logger).Log("msg", "downscale not allowed because the serviceName is not set or empty")
return deny(
fmt.Sprintf(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because the serviceName is not set or empty.",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldInfo.replicas, *newInfo.replicas,
),
)
}

if stsPrepareInfo.rolloutGroup != "" {
stsList, err := findStatefulSetsForRolloutGroup(ctx, api, ar.Request.Namespace, stsPrepareInfo.rolloutGroup)
if err != nil {
level.Warn(logger).Log("msg", "downscale not allowed due to error while finding other statefulsets", "err", err)
return deny(
Expand Down Expand Up @@ -154,7 +160,7 @@ func (zt *zoneTracker) prepareDownscale(ctx context.Context, l log.Logger, ar ad
}

// It's a downscale, so we need to prepare the pods that are going away for shutdown.
eps := createEndpoints(ar, oldInfo, newInfo, port, path)
eps := createEndpoints(ar, oldInfo, newInfo, stsPrepareInfo.port, stsPrepareInfo.path, stsPrepareInfo.serviceName)

err = sendPrepareShutdownRequests(ctx, logger, client, eps)
if err != nil {
Expand Down