Skip to content

Commit 17023bf

Browse files
rakesh-garimellaFiery-Fenix
authored andcommitted
[receiver/k8s cluster receiver]add support for K8s leader extension into k8s cluster receiver (open-telemetry#38429)
#### Description - Add support for k8s leader elector to k8s cluster receiver so that multiple instances of k8s cluster receiver can be executed. - The instance which has the lease fetches the metrics. example config: ```yaml extensions: k8s_leader_elector: auth_type: kubeConfig lease_name: foo lease_namespace: default receivers: k8s_cluster: k8s_leader_elector: k8s_leader_elector node_conditions_to_report: [Ready, MemoryPressure] allocatable_types_to_report: [cpu, memory] exporters: debug: verbosity: detailed service: extensions: [k8s_leader_elector] pipelines: metrics: receivers: [k8s_cluster] exporters: [debug] telemetry: logs: level: info ```
1 parent 84477b8 commit 17023bf

File tree

9 files changed

+207
-17
lines changed

9 files changed

+207
-17
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: k8sclusterreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "add support for k8s leader election in k8s cluster receiver"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38429]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Allows multiple instances of the k8s cluster receiver to run in a HA mode in a single cluster.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

extension/k8sleaderelector/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
go.opentelemetry.io/collector/confmap v1.30.0
1212
go.opentelemetry.io/collector/extension v1.30.0
1313
go.opentelemetry.io/collector/extension/extensiontest v0.124.0
14+
go.opentelemetry.io/collector/pipeline v0.124.0
1415
go.uber.org/goleak v1.3.0
1516
go.uber.org/zap v1.27.0
1617
k8s.io/apimachinery v0.32.3
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package k8sleaderelectortest // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector/k8sleaderelectortest"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/pipeline"
11+
12+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector"
13+
)
14+
15+
type FakeHost struct {
16+
FakeLeaderElection *FakeLeaderElection
17+
}
18+
19+
func (fh *FakeHost) GetExtensions() map[component.ID]component.Component {
20+
extID := component.MustNewID("k8s_leader_elector")
21+
return map[component.ID]component.Component{
22+
extID: fh.FakeLeaderElection,
23+
}
24+
}
25+
26+
func (fh *FakeHost) GetExporters() map[pipeline.Signal]map[component.ID]component.Component {
27+
return nil
28+
}
29+
30+
type FakeLeaderElection struct {
31+
OnLeading func(context.Context)
32+
OnStopping func()
33+
}
34+
35+
func (fle *FakeLeaderElection) SetCallBackFuncs(onLeading k8sleaderelector.StartCallback, onStopping k8sleaderelector.StopCallback) {
36+
fle.OnLeading = onLeading
37+
fle.OnStopping = onStopping
38+
}
39+
40+
func (fle *FakeLeaderElection) InvokeOnLeading() {
41+
if fle.OnLeading != nil {
42+
fle.OnLeading(context.Background())
43+
}
44+
}
45+
46+
func (fle *FakeLeaderElection) Start(_ context.Context, _ component.Host) error { return nil }
47+
48+
func (fle *FakeLeaderElection) Shutdown(_ context.Context) error { return nil }
49+
50+
func (fle *FakeLeaderElection) InvokeOnStopping() {
51+
if fle.OnStopping != nil {
52+
fle.OnStopping()
53+
}
54+
}

receiver/k8sclusterreceiver/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ Example:
6767
```yaml
6868
k8s_cluster:
6969
auth_type: kubeConfig
70+
k8s_leader_elector: <reference k8s leader elector extension>
7071
node_conditions_to_report: [Ready, MemoryPressure]
7172
allocatable_types_to_report: [cpu, memory]
7273
metrics:
@@ -80,6 +81,16 @@ Example:
8081
The full list of settings exposed for this receiver are documented in [config.go](./config.go)
8182
with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml).
8283
84+
### k8s_leader_elector
85+
Provide name of the k8s leader elector extension defined in config. This allows multiple instances of k8s cluster
86+
receiver to be executed on a cluster. At a given time only the pod which has the is active.
87+
88+
```yaml
89+
k8s_cluster:
90+
k8s_leader_elector: k8s_leader_elector
91+
...
92+
```
93+
8394
### node_conditions_to_report
8495

8596
For example, with the config below the receiver will emit two metrics
@@ -96,6 +107,7 @@ k8s_cluster:
96107
...
97108
```
98109

110+
99111
### metadata_exporters
100112

101113
A list of metadata exporters to which metadata being collected by this receiver

receiver/k8sclusterreceiver/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"fmt"
88
"time"
99

10+
"go.opentelemetry.io/collector/component"
11+
1012
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
1113
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
1214
)
@@ -44,6 +46,10 @@ type Config struct {
4446
// will not be able to be observed. Setting this option is recommended in environments where due to security restrictions
4547
// the collector cannot be granted cluster-wide permissions.
4648
Namespace string `mapstructure:"namespace"`
49+
50+
// K8sLeaderElector defines the reference to the k8s leader elector extension
51+
// use this when k8s cluster receiver needs to be deployed in HA mode
52+
K8sLeaderElector *component.ID `mapstructure:"k8s_leader_elector"`
4753
}
4854

4955
func (cfg *Config) Validate() error {

receiver/k8sclusterreceiver/go.mod

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/google/go-cmp v0.7.0
77
github.com/google/uuid v1.6.0
88
github.com/iancoleman/strcase v0.3.0
9+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.0.0-00010101000000-000000000000
910
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.124.1
1011
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.124.1
1112
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.124.1
@@ -99,6 +100,7 @@ require (
99100
go.opentelemetry.io/collector/config/configtls v1.30.0 // indirect
100101
go.opentelemetry.io/collector/consumer/consumererror v0.124.0 // indirect
101102
go.opentelemetry.io/collector/consumer/xconsumer v0.124.0 // indirect
103+
go.opentelemetry.io/collector/extension v1.30.0 // indirect
102104
go.opentelemetry.io/collector/extension/extensionauth v1.30.0 // indirect
103105
go.opentelemetry.io/collector/featuregate v1.30.0 // indirect
104106
go.opentelemetry.io/collector/internal/sharedcomponent v0.124.0 // indirect
@@ -137,7 +139,7 @@ require (
137139
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
138140
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
139141
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
140-
sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect
142+
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
141143
sigs.k8s.io/yaml v1.4.0 // indirect
142144
)
143145

@@ -165,3 +167,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xk8stest =
165167
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent
166168

167169
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
170+
171+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector => ../../extension/k8sleaderelector

receiver/k8sclusterreceiver/go.sum

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/k8sclusterreceiver/receiver.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414
"go.opentelemetry.io/collector/pipeline"
1515
"go.opentelemetry.io/collector/receiver"
1616
"go.opentelemetry.io/collector/receiver/receiverhelper"
17+
"go.uber.org/zap"
1718

19+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector"
1820
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"
1921
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
2022
)
@@ -42,9 +44,7 @@ type getExporters interface {
4244
GetExporters() map[pipeline.Signal]map[component.ID]component.Component
4345
}
4446

45-
func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) error {
46-
ctx, kr.cancel = context.WithCancel(ctx)
47-
47+
func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component.Host) error {
4848
if err := kr.resourceWatcher.initialize(); err != nil {
4949
return err
5050
}
@@ -97,15 +97,58 @@ func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) er
9797
}
9898
}
9999
}()
100+
return nil
101+
}
102+
103+
func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) error {
104+
ctx, kr.cancel = context.WithCancel(ctx)
105+
106+
// if extension is defined start with k8s leader elector
107+
if kr.config.K8sLeaderElector != nil {
108+
kr.settings.Logger.Info("Starting k8sClusterReceiver with leader election")
109+
extList := host.GetExtensions()
110+
if extList == nil {
111+
return errors.New("extension list is empty")
112+
}
113+
114+
ext := extList[*kr.config.K8sLeaderElector]
115+
if ext == nil {
116+
return errors.New("extension k8s leader elector not found")
117+
}
118+
119+
leaderElectorExt, ok := ext.(k8sleaderelector.LeaderElection)
120+
if !ok {
121+
return errors.New("referenced extension is not k8s leader elector")
122+
}
123+
124+
leaderElectorExt.SetCallBackFuncs(
125+
func(ctx context.Context) {
126+
if err := kr.startReceiver(ctx, host); err != nil {
127+
kr.settings.Logger.Error("Failed to start receiver", zap.Error(err))
128+
}
129+
}, func() {
130+
kr.stopReceiver()
131+
},
132+
)
133+
} else {
134+
kr.settings.Logger.Info("Starting k8sClusterReceiver without leader election")
135+
if err := kr.startReceiver(ctx, host); err != nil {
136+
return err
137+
}
138+
}
100139

101140
return nil
102141
}
103142

104-
func (kr *kubernetesReceiver) Shutdown(context.Context) error {
105-
if kr.cancel == nil {
106-
return nil
143+
func (kr *kubernetesReceiver) stopReceiver() {
144+
kr.settings.Logger.Info("Stopping the receiver")
145+
if kr.cancel != nil {
146+
kr.cancel()
107147
}
108-
kr.cancel()
148+
}
149+
150+
func (kr *kubernetesReceiver) Shutdown(context.Context) error {
151+
kr.stopReceiver()
109152
return nil
110153
}
111154

0 commit comments

Comments
 (0)