Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions source/informers/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informers

import (
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"
)

func DefaultEventHandler(handlers ...func()) cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
if u, ok := obj.(*unstructured.Unstructured); ok {
log.WithFields(log.Fields{
"apiVersion": u.GetAPIVersion(),
"kind": u.GetKind(),
"namespace": u.GetNamespace(),
"name": u.GetName(),
}).Debug("added")
for _, handler := range handlers {
handler()
}
}
},
}
}
50 changes: 50 additions & 0 deletions source/informers/handlers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informers

import (
"testing"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

func TestDefaultEventHandler_AddFunc(t *testing.T) {
tests := []struct {
name string
obj any
expected bool
}{
{
name: "calls handler for unstructured object",
obj: &unstructured.Unstructured{},
expected: true,
},
{
name: "does not call handler for unknown object",
obj: "not-unstructured",
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
called := false
handler := DefaultEventHandler(func() { called = true })
handler.OnAdd(tt.obj, true)
if called != tt.expected {
t.Errorf("handler called = %v, want %v", called, tt.expected)
}
})
}
}
73 changes: 44 additions & 29 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,34 +99,39 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
serviceInformer := informerFactory.Core().V1().Services()
endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices()
podInformer := informerFactory.Core().V1().Pods()
nodeInformer := informerFactory.Core().V1().Nodes()

// Add default resource event handlers to properly initialize informer.
serviceInformer.Informer().AddEventHandler(
_, _ = serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
endpointSlicesInformer.Informer().AddEventHandler(
_, _ = endpointSlicesInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
nodeInformer.Informer().AddEventHandler(
_, _ = podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)

// Transform the slice into a map so it will be way much easier and fast to filter later
sTypesFilter, err := newServiceTypesFilter(serviceTypeFilter)
if err != nil {
return nil, err
}

var nodeInformer coreinformers.NodeInformer
if sTypesFilter.isNodeInformerRequired() {
nodeInformer = informerFactory.Core().V1().Nodes()
_, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
}

// Add an indexer to the EndpointSlice informer to index by the service name label
err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{
serviceNameIndexKey: func(obj any) ([]string, error) {
Expand Down Expand Up @@ -154,12 +159,6 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
return nil, err
}

// Transform the slice into a map so it will be way much easier and fast to filter later
sTypesFilter, err := newServiceTypesFilter(serviceTypeFilter)
if err != nil {
return nil, err
}

return &serviceSource{
client: kubeClient,
namespace: namespace,
Expand Down Expand Up @@ -198,7 +197,7 @@ func (sc *serviceSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, err
return nil, err
}

endpoints := []*endpoint.Endpoint{}
endpoints := make([]*endpoint.Endpoint, 0)

for _, svc := range services {
// Check controller annotation to see if we are responsible.
Expand Down Expand Up @@ -293,11 +292,7 @@ func (sc *serviceSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, err
func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint

labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
if err != nil {
return nil
}
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
selector, err := annotations.ParseFilter(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
if err != nil {
return nil
}
Expand Down Expand Up @@ -371,6 +366,10 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
targets := annotations.TargetsFromTargetAnnotation(pod.Annotations)
if len(targets) == 0 {
if endpointsType == EndpointsTypeNodeExternalIP {
if sc.nodeInformer == nil {
log.Warnf("Skipping EndpointSlice %s/%s as --service-type-filter disable node informer", endpointSlice.Namespace, endpointSlice.Name)
continue
}
node, err := sc.nodeInformer.Lister().Get(pod.Spec.NodeName)
if err != nil {
log.Errorf("Get node[%s] of pod[%s] error: %v; not adding any NodeExternalIP endpoints", pod.Spec.NodeName, pod.GetName(), err)
Expand Down Expand Up @@ -466,7 +465,8 @@ func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.End
// endpointsFromService extracts the endpoints from a service object
func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint
// Skip endpoints if we do not want entries from annotations

// Skip endpoints if we do not want entries from annotations or service is excluded
if sc.ignoreHostnameAnnotation {
return endpoints
}
Expand Down Expand Up @@ -519,7 +519,7 @@ func (sc *serviceSource) filterByServiceType(services []*v1.Service) []*v1.Servi
}
var result []*v1.Service
for _, service := range services {
if _, ok := sc.serviceTypeFilter.types[service.Spec.Type]; ok {
if sc.serviceTypeFilter.isProcessed(service.Spec.Type) {
result = append(result, service)
}
}
Expand Down Expand Up @@ -808,9 +808,12 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) {

// Right now there is no way to remove event handler from informer, see:
// https://github.com/kubernetes/kubernetes/issues/79610
sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
_, _ = sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
if sc.listenEndpointEvents {
sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
_, _ = sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
if sc.serviceTypeFilter.isNodeInformerRequired() {
_, _ = sc.nodeInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
}

Expand All @@ -828,20 +831,32 @@ func newServiceTypesFilter(filter []string) (*serviceTypes, error) {
enabled: false,
}, nil
}
types := make(map[v1.ServiceType]bool)
result := make(map[v1.ServiceType]bool)
for _, serviceType := range filter {
if _, ok := knownServiceTypes[v1.ServiceType(serviceType)]; !ok {
return nil, fmt.Errorf("unsupported service type filter: %q. Supported types are: %q", serviceType, slices.Collect(maps.Keys(knownServiceTypes)))
}
types[v1.ServiceType(serviceType)] = true
result[v1.ServiceType(serviceType)] = true
}

return &serviceTypes{
enabled: true,
types: types,
types: result,
}, nil
}

func (sc *serviceTypes) isProcessed(serviceType v1.ServiceType) bool {
return !sc.enabled || sc.types[serviceType]
}

func (sc *serviceTypes) isNodeInformerRequired() bool {
if !sc.enabled {
return true
}
_, ok := sc.types[v1.ServiceTypeNodePort]
return ok
}

// conditionToBool converts an EndpointConditions condition to a bool value.
func conditionToBool(v *bool) bool {
if v == nil {
Expand Down
Loading
Loading