Skip to content
Merged
12 changes: 2 additions & 10 deletions cmd/autoscaler/app/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/v3io/scaler/pkg/autoscaler"
"github.com/v3io/scaler/pkg/common"
"github.com/v3io/scaler/pkg/pluginloader"
"github.com/v3io/scaler/pkg/scalertypes"

Expand All @@ -35,7 +36,6 @@ import (
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/metrics/pkg/client/custom_metrics"
)

Expand Down Expand Up @@ -73,7 +73,7 @@ func Run(kubeconfigPath string,
autoScalerOptions = resourceScalerConfig.AutoScalerOptions
}

restConfig, err := getClientConfig(kubeconfigPath)
restConfig, err := common.GetClientConfig(kubeconfigPath)
if err != nil {
return errors.Wrap(err, "Failed to get client configuration")
}
Expand Down Expand Up @@ -119,11 +119,3 @@ func createAutoScaler(restConfig *rest.Config,

return newScaler, nil
}

func getClientConfig(kubeconfigPath string) (*rest.Config, error) {
if kubeconfigPath != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}

return rest.InClusterConfig()
}
17 changes: 16 additions & 1 deletion cmd/dlx/app/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"os"
"time"

"github.com/v3io/scaler/pkg/common"
"github.com/v3io/scaler/pkg/dlx"
"github.com/v3io/scaler/pkg/pluginloader"
"github.com/v3io/scaler/pkg/scalertypes"

"github.com/nuclio/errors"
"github.com/nuclio/zap"
"k8s.io/client-go/kubernetes"
)

func Run(kubeconfigPath string,
Expand Down Expand Up @@ -75,6 +77,16 @@ func Run(kubeconfigPath string,
dlxOptions = resourceScalerConfig.DLXOptions
}

restConfig, err := common.GetClientConfig(kubeconfigPath)
if err != nil {
return errors.Wrap(err, "Failed to get client configuration")
}

dlxOptions.KubeClientSet, err = kubernetes.NewForConfig(restConfig)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if dlxOptions.KubeClientSet, err = kubernetes.NewForConfig(restConfig); err != nil {}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return errors.Wrap(err, "Failed to create k8s client set")
}

newDLX, err := createDLX(resourceScaler, dlxOptions)
if err != nil {
return errors.Wrap(err, "Failed to create dlx")
Expand All @@ -88,7 +100,10 @@ func Run(kubeconfigPath string,
select {}
}

func createDLX(resourceScaler scalertypes.ResourceScaler, options scalertypes.DLXOptions) (*dlx.DLX, error) {
func createDLX(
resourceScaler scalertypes.ResourceScaler,
options scalertypes.DLXOptions,
) (*dlx.DLX, error) {
rootLogger, err := nucliozap.NewNuclioZap("scaler",
"console",
nil,
Expand Down
11 changes: 11 additions & 0 deletions pkg/common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ package common
import (
"math/rand"
"time"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

var SeededRand = rand.New(rand.NewSource(time.Now().UnixNano()))
Expand All @@ -38,3 +41,11 @@ func UniquifyStringSlice(stringList []string) []string {
}
return list
}

func GetClientConfig(kubeconfigPath string) (*rest.Config, error) {
if kubeconfigPath != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}

return rest.InClusterConfig()
}
26 changes: 26 additions & 0 deletions pkg/dlx/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"context"
"net/http"

"github.com/v3io/scaler/pkg/ingresscache"
"github.com/v3io/scaler/pkg/kube"
"github.com/v3io/scaler/pkg/scalertypes"

"github.com/nuclio/errors"
Expand All @@ -34,6 +36,7 @@ type DLX struct {
logger logger.Logger
handler Handler
server *http.Server
watcher *kube.IngressWatcher
}

func NewDLX(parentLogger logger.Logger,
Expand All @@ -50,6 +53,7 @@ func NewDLX(parentLogger logger.Logger,
return nil, errors.Wrap(err, "Failed to create function starter")
}

cache := ingresscache.NewIngressCache(childLogger)
handler, err := NewHandler(childLogger,
resourceStarter,
resourceScaler,
Expand All @@ -61,23 +65,45 @@ func NewDLX(parentLogger logger.Logger,
return nil, errors.Wrap(err, "Failed to create handler")
}

watcher, err := kube.NewIngressWatcher(
context.Background(),
childLogger,
options.KubeClientSet,
cache,
options.ResolveTargetsFromIngressCallback,
options.ResyncInterval,
options.Namespace,
options.LabelSelector,
)
if err != nil {
return nil, errors.Wrap(err, "Failed to create ingress watcher")
}

return &DLX{
logger: childLogger,
handler: handler,
server: &http.Server{
Addr: options.ListenAddress,
},
watcher: watcher,
}, nil
}

func (d *DLX) Start() error {
d.logger.DebugWith("Starting", "server", d.server.Addr)
http.HandleFunc("/", d.handler.HandleFunc)

// Start the ingress watcher synchronously to ensure cache is fully synced before DLX begins handling traffic
if err := d.watcher.Start(); err != nil {
return errors.Wrap(err, "Failed to start ingress watcher")
}

go d.server.ListenAndServe() // nolint: errcheck
return nil
}

func (d *DLX) Stop(context context.Context) error {
d.logger.DebugWith("Stopping", "server", d.server.Addr)
d.watcher.Stop()
return d.server.Shutdown(context)
}
2 changes: 1 addition & 1 deletion pkg/ingresscache/ingresscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type IngressCache struct {
func NewIngressCache(logger logger.Logger) *IngressCache {
return &IngressCache{
syncMap: &sync.Map{},
logger: logger,
logger: logger.GetChild("cache"),
}
}

Expand Down
73 changes: 37 additions & 36 deletions pkg/kube/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package kube

import (
"context"
"time"

"github.com/v3io/scaler/pkg/ingresscache"
"github.com/v3io/scaler/pkg/scalertypes"

"github.com/nuclio/errors"
"github.com/nuclio/logger"
Expand All @@ -35,29 +35,6 @@ import (
"k8s.io/client-go/tools/cache"
)

const (
defaultResyncInterval = 30 * time.Second
)

// ResolveTargetsFromIngressCallback defines a function that extracts a list of target identifiers
// (e.g., names of services the Ingress routes traffic to) from a Kubernetes Ingress resource.
//
// This function is expected to be implemented externally and passed into the IngressWatcher,
// allowing for custom logic such as parsing annotations, labels, or other ingress metadata.
//
// Parameters:
// - ingress: The Kubernetes Ingress resource to extract targets from
//
// Returns:
// - []string: A slice of target identifiers (e.g., service names, endpoint addresses)
// - error: An error if target resolution fails
//
// Implementation guidelines:
// - Return a non-nil slice when targets are successfully resolved
// - Return a non-nil error if resolution fails
// - Should handle nil or malformed Ingress objects gracefully and return an error in such cases
type ResolveTargetsFromIngressCallback func(ingress *networkingv1.Ingress) ([]string, error)

type ingressValue struct {
name string
host string
Expand All @@ -68,31 +45,33 @@ type ingressValue struct {
// IngressWatcher watches for changes in Kubernetes Ingress resources and updates the ingress cache accordingly
type IngressWatcher struct {
ctx context.Context
cancel context.CancelFunc
logger logger.Logger
cache ingresscache.IngressHostCache
factory informers.SharedInformerFactory
informer cache.SharedIndexInformer
resolveTargetsCallback ResolveTargetsFromIngressCallback
resolveTargetsCallback scalertypes.ResolveTargetsFromIngressCallback
}

func NewIngressWatcher(
ctx context.Context,
dlxCtx context.Context,
dlxLogger logger.Logger,
kubeClient kubernetes.Interface,
ingressCache ingresscache.IngressCache,
resolveTargetsCallback ResolveTargetsFromIngressCallback,
resyncTimeout *time.Duration,
ingressCache ingresscache.IngressHostCache,
resolveTargetsCallback scalertypes.ResolveTargetsFromIngressCallback,
resyncInterval scalertypes.Duration,
namespace string,
labelSelector string,
) (*IngressWatcher, error) {
if resyncTimeout == nil {
defaultTimeout := defaultResyncInterval
resyncTimeout = &defaultTimeout
if resyncInterval.Duration == 0 {
resyncInterval = scalertypes.Duration{Duration: scalertypes.DefaultResyncInterval}
}

ctxWithCancel, cancel := context.WithCancel(dlxCtx)

factory := informers.NewSharedInformerFactoryWithOptions(
kubeClient,
*resyncTimeout,
resyncInterval.Duration,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector
Expand All @@ -101,9 +80,10 @@ func NewIngressWatcher(
ingressInformer := factory.Networking().V1().Ingresses().Informer()

ingressWatcher := &IngressWatcher{
ctx: ctx,
ctx: ctxWithCancel,
cancel: cancel,
logger: dlxLogger.GetChild("watcher"),
cache: &ingressCache,
cache: ingressCache,
factory: factory,
informer: ingressInformer,
resolveTargetsCallback: resolveTargetsCallback,
Expand Down Expand Up @@ -135,6 +115,7 @@ func (iw *IngressWatcher) Start() error {

func (iw *IngressWatcher) Stop() {
iw.logger.Info("Stopping ingress watcher")
iw.cancel()
iw.factory.Shutdown()
}

Expand Down Expand Up @@ -167,6 +148,26 @@ func (iw *IngressWatcher) AddHandler(obj interface{}) {
}

func (iw *IngressWatcher) UpdateHandler(oldObj, newObj interface{}) {
oldIngressResource, ok := oldObj.(*networkingv1.Ingress)
if !ok {
iw.logger.DebugWith("Failed to cast old object to Ingress",
"object", oldObj)
return
}

newIngressResource, ok := newObj.(*networkingv1.Ingress)
if !ok {
iw.logger.DebugWith("Failed to cast new object to Ingress",
"object", newObj)
return
}

// ResourceVersion is managed by Kubernetes and indicates whether the resource has changed.
// Comparing resourceVersion helps avoid unnecessary updates triggered by periodic informer resync
if oldIngressResource.ResourceVersion == newIngressResource.ResourceVersion {
return
}

oldIngress, err := iw.extractValuesFromIngressResource(oldObj)
if err != nil {
iw.logger.DebugWith("Update ingress handler - failed to extract values from old object",
Expand Down Expand Up @@ -249,7 +250,7 @@ func (iw *IngressWatcher) extractValuesFromIngressResource(obj interface{}) (*in

targets, err := iw.resolveTargetsCallback(ingress)
if err != nil {
return nil, errors.Wrap(err, "Failed to extract targets from ingress labels")
return nil, errors.Wrapf(err, "Failed to extract targets from ingress labels: %s", err.Error())
}

if len(targets) == 0 {
Expand Down
Loading