Skip to content

Commit a6f867b

Browse files
authored
[receiver/kafkametrics] Implement client reset for recoverable errors in Sarama calls (#41363)
… <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This change implements client reset functionality to address recoverable errors in Sarama calls, such as connection resets and EOF errors. When a recoverable error is encountered, the client is reset, enabling the scraper to reconnect and resume metric collection seamlessly. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue none <!--Describe what testing was performed and which tests were added.--> #### Testing Existing tests were modified accordingly. <!--Describe the documentation added.--> #### Documentation none <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 09b8c96 commit a6f867b

File tree

8 files changed

+121
-10
lines changed

8 files changed

+121
-10
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkametricsreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Implement client reset for recoverable errors in Sarama calls
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [41363]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
This change implements client reset functionality to address recoverable errors in Sarama calls, such as connection resets and EOF errors. When a recoverable error is encountered, the client is reset, enabling the scraper to reconnect and resume metric collection seamlessly.
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: []

receiver/kafkametricsreceiver/broker_scraper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (s *brokerScraper) shutdown(context.Context) error {
4848
func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
4949
scrapeErrors := scrapererror.ScrapeErrors{}
5050

51-
if s.client == nil {
51+
if s.client == nil || s.client.Closed() {
5252
client, err := newSaramaClient(context.Background(), s.config.ClientConfig)
5353
if err != nil {
5454
return pmetric.Metrics{}, fmt.Errorf("failed to create client in brokers scraper: %w", err)

receiver/kafkametricsreceiver/broker_scraper_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func TestBrokerScraper_shutdown_handles_nil_client(t *testing.T) {
8888
func TestBrokerScraper_empty_resource_attribute(t *testing.T) {
8989
client := newMockClient()
9090
client.Mock.On("Brokers").Return(testBrokers)
91+
client.Mock.On("Closed").Return(false)
9192
bs := brokerScraper{
9293
client: client,
9394
settings: receivertest.NewNopSettings(metadata.Type),
@@ -108,6 +109,7 @@ func TestBrokerScraper_empty_resource_attribute(t *testing.T) {
108109
func TestBrokerScraper_scrape(t *testing.T) {
109110
client := newMockClient()
110111
client.Mock.On("Brokers").Return(testBrokers)
112+
client.Mock.On("Closed").Return(false)
111113
bs := brokerScraper{
112114
client: client,
113115
settings: receivertest.NewNopSettings(metadata.Type),

receiver/kafkametricsreceiver/consumer_scraper.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"fmt"
99
"regexp"
10+
"sync"
1011
"time"
1112

1213
"github.com/IBM/sarama"
@@ -28,6 +29,7 @@ type consumerScraper struct {
2829
clusterAdmin sarama.ClusterAdmin
2930
config Config
3031
mb *metadata.MetricsBuilder
32+
mu sync.Mutex
3133
}
3234

3335
func (s *consumerScraper) start(_ context.Context, _ component.Host) error {
@@ -43,7 +45,7 @@ func (s *consumerScraper) shutdown(_ context.Context) error {
4345
}
4446

4547
func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
46-
if s.client == nil {
48+
if s.client == nil || s.client.Closed() {
4749
client, err := newSaramaClient(context.Background(), s.config.ClientConfig)
4850
if err != nil {
4951
return pmetric.Metrics{}, fmt.Errorf("failed to create client in consumer scraper: %w", err)
@@ -64,7 +66,7 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
6466

6567
cgs, listErr := s.clusterAdmin.ListConsumerGroups()
6668
if listErr != nil {
67-
return pmetric.Metrics{}, listErr
69+
return pmetric.Metrics{}, s.resetClientOnError(listErr)
6870
}
6971

7072
var matchedGrpIDs []string
@@ -76,7 +78,7 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
7678

7779
allTopics, listErr := s.clusterAdmin.ListTopics()
7880
if listErr != nil {
79-
return pmetric.Metrics{}, listErr
81+
return pmetric.Metrics{}, s.resetClientOnError(listErr)
8082
}
8183

8284
matchedTopics := map[string]sarama.TopicDetail{}
@@ -110,7 +112,7 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
110112
}
111113
consumerGroups, listErr := s.clusterAdmin.DescribeConsumerGroups(matchedGrpIDs)
112114
if listErr != nil {
113-
return pmetric.Metrics{}, listErr
115+
return pmetric.Metrics{}, s.resetClientOnError(listErr)
114116
}
115117

116118
now := pcommon.NewTimestampFromTime(time.Now())
@@ -186,3 +188,15 @@ func createConsumerScraper(_ context.Context, cfg Config, settings receiver.Sett
186188
scraper.WithShutdown(s.shutdown),
187189
)
188190
}
191+
192+
func (s *consumerScraper) resetClientOnError(err error) error {
193+
if isRecoverableError(err) {
194+
s.mu.Lock()
195+
defer s.mu.Unlock()
196+
s.clusterAdmin.Close()
197+
s.clusterAdmin = nil
198+
return fmt.Errorf("closing client because of reconnection error %w", err)
199+
}
200+
201+
return err
202+
}

receiver/kafkametricsreceiver/consumer_scraper_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,16 @@ func TestConsumerScraper_createScraper_handles_invalid_group_match(t *testing.T)
123123
}
124124

125125
func TestConsumerScraper_scrape(t *testing.T) {
126+
client := newMockClient()
126127
filter := regexp.MustCompile(defaultGroupMatch)
127128
cs := consumerScraper{
128-
client: newMockClient(),
129+
client: client,
129130
settings: receivertest.NewNopSettings(metadata.Type),
130131
clusterAdmin: newMockClusterAdmin(),
131132
topicFilter: filter,
132133
groupFilter: filter,
133134
}
135+
client.Mock.On("Closed").Return(false)
134136
require.NoError(t, cs.start(t.Context(), componenttest.NewNopHost()))
135137
md, err := cs.scrape(t.Context())
136138
assert.NoError(t, err)
@@ -149,36 +151,41 @@ func TestConsumerScraper_scrape_handlesListTopicError(t *testing.T) {
149151
topicFilter: filter,
150152
groupFilter: filter,
151153
}
154+
client.Mock.On("Closed").Return(false)
152155
_, err := cs.scrape(t.Context())
153156
assert.Error(t, err)
154157
}
155158

156159
func TestConsumerScraper_scrape_handlesListConsumerGroupError(t *testing.T) {
160+
client := newMockClient()
157161
filter := regexp.MustCompile(defaultGroupMatch)
158162
clusterAdmin := newMockClusterAdmin()
159163
clusterAdmin.consumerGroups = nil
160164
cs := consumerScraper{
161-
client: newMockClient(),
165+
client: client,
162166
settings: receivertest.NewNopSettings(metadata.Type),
163167
clusterAdmin: clusterAdmin,
164168
topicFilter: filter,
165169
groupFilter: filter,
166170
}
171+
client.Mock.On("Closed").Return(false)
167172
_, err := cs.scrape(t.Context())
168173
assert.Error(t, err)
169174
}
170175

171176
func TestConsumerScraper_scrape_handlesDescribeConsumerError(t *testing.T) {
177+
client := newMockClient()
172178
filter := regexp.MustCompile(defaultGroupMatch)
173179
clusterAdmin := newMockClusterAdmin()
174180
clusterAdmin.consumerGroupDescriptions = nil
175181
cs := consumerScraper{
176-
client: newMockClient(),
182+
client: client,
177183
settings: receivertest.NewNopSettings(metadata.Type),
178184
clusterAdmin: clusterAdmin,
179185
topicFilter: filter,
180186
groupFilter: filter,
181187
}
188+
client.Mock.On("Closed").Return(false)
182189
_, err := cs.scrape(t.Context())
183190
assert.Error(t, err)
184191
}
@@ -196,6 +203,7 @@ func TestConsumerScraper_scrape_handlesOffsetPartialError(t *testing.T) {
196203
topicFilter: filter,
197204
clusterAdmin: clusterAdmin,
198205
}
206+
client.Mock.On("Closed").Return(false)
199207
require.NoError(t, cs.start(t.Context(), componenttest.NewNopHost()))
200208
_, err := cs.scrape(t.Context())
201209
assert.Error(t, err)
@@ -214,6 +222,7 @@ func TestConsumerScraper_scrape_handlesPartitionPartialError(t *testing.T) {
214222
topicFilter: filter,
215223
clusterAdmin: clusterAdmin,
216224
}
225+
client.Mock.On("Closed").Return(false)
217226
require.NoError(t, cs.start(t.Context(), componenttest.NewNopHost()))
218227
_, err := cs.scrape(t.Context())
219228
assert.Error(t, err)

receiver/kafkametricsreceiver/receiver.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
10+
"io"
11+
"net"
12+
"os"
13+
"syscall"
914

1015
"github.com/IBM/sarama"
1116
"go.opentelemetry.io/collector/component"
@@ -60,3 +65,38 @@ var newMetricsReceiver = func(
6065
scraperControllerOptions...,
6166
)
6267
}
68+
69+
// isRecoverableError checks if the error can be resolved by re-establishing connection
70+
func isRecoverableError(err error) bool {
71+
if errors.Is(err, sarama.ErrOutOfBrokers) {
72+
return true
73+
}
74+
75+
if errors.Is(err, sarama.ErrClosedClient) {
76+
return true
77+
}
78+
79+
if errors.Is(err, os.ErrDeadlineExceeded) {
80+
// Error example: read tcp 10.2.3.4:62523->4.3.2.1:9093: i/o timeout
81+
return true
82+
}
83+
84+
if errors.Is(err, syscall.EPIPE) {
85+
return true
86+
}
87+
88+
if errors.Is(err, net.ErrClosed) {
89+
return true
90+
}
91+
92+
if errors.Is(err, syscall.ECONNRESET) {
93+
// Error example: write tcp 1.2.3.4:56532->4.3.2.1:9093: write: connection reset by peer
94+
return true
95+
}
96+
97+
if errors.Is(err, io.EOF) {
98+
return true
99+
}
100+
101+
return false
102+
}

receiver/kafkametricsreceiver/topic_scraper.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"regexp"
1010
"strconv"
11+
"sync"
1112
"time"
1213

1314
"github.com/IBM/sarama"
@@ -29,6 +30,7 @@ type topicScraper struct {
2930
topicFilter *regexp.Regexp
3031
config Config
3132
mb *metadata.MetricsBuilder
33+
mu sync.Mutex
3234
}
3335

3436
const (
@@ -50,7 +52,7 @@ func (s *topicScraper) start(_ context.Context, _ component.Host) error {
5052
}
5153

5254
func (s *topicScraper) scrape(context.Context) (pmetric.Metrics, error) {
53-
if s.client == nil {
55+
if s.client == nil || s.client.Closed() {
5456
client, err := newSaramaClient(context.Background(), s.config.ClientConfig)
5557
if err != nil {
5658
return pmetric.Metrics{}, fmt.Errorf("failed to create client in topics scraper: %w", err)
@@ -61,7 +63,7 @@ func (s *topicScraper) scrape(context.Context) (pmetric.Metrics, error) {
6163
topics, err := s.client.Topics()
6264
if err != nil {
6365
s.settings.Logger.Error("Error fetching cluster topics ", zap.Error(err))
64-
return pmetric.Metrics{}, err
66+
return pmetric.Metrics{}, s.resetClientOnError(err)
6567
}
6668

6769
scrapeErrors := scrapererror.ScrapeErrors{}
@@ -184,3 +186,15 @@ func createTopicsScraper(_ context.Context, cfg Config, settings receiver.Settin
184186
scraper.WithShutdown(s.shutdown),
185187
)
186188
}
189+
190+
func (s *topicScraper) resetClientOnError(err error) error {
191+
if isRecoverableError(err) {
192+
s.mu.Lock()
193+
defer s.mu.Unlock()
194+
s.client.Close()
195+
s.client = nil
196+
return fmt.Errorf("closing client because of reconnection error %w", err)
197+
}
198+
199+
return err
200+
}

receiver/kafkametricsreceiver/topic_scraper_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func TestTopicScraper_scrapes(t *testing.T) {
109109
config: *config,
110110
topicFilter: match,
111111
}
112+
client.Mock.On("Closed").Return(false)
112113
require.NoError(t, scraper.start(t.Context(), componenttest.NewNopHost()))
113114
md, err := scraper.scrape(t.Context())
114115
assert.NoError(t, err)
@@ -153,6 +154,7 @@ func TestTopicScraper_scrape_handlesTopicError(t *testing.T) {
153154
settings: receivertest.NewNopSettings(metadata.Type),
154155
topicFilter: match,
155156
}
157+
client.Mock.On("Closed").Return(false)
156158
_, err := scraper.scrape(t.Context())
157159
assert.Error(t, err)
158160
}
@@ -168,6 +170,7 @@ func TestTopicScraper_scrape_handlesPartitionError(t *testing.T) {
168170
settings: receivertest.NewNopSettings(metadata.Type),
169171
topicFilter: match,
170172
}
173+
client.Mock.On("Closed").Return(false)
171174
require.NoError(t, scraper.start(t.Context(), componenttest.NewNopHost()))
172175
_, err := scraper.scrape(t.Context())
173176
assert.Error(t, err)
@@ -187,6 +190,7 @@ func TestTopicScraper_scrape_handlesPartialScrapeErrors(t *testing.T) {
187190
settings: receivertest.NewNopSettings(metadata.Type),
188191
topicFilter: match,
189192
}
193+
client.Mock.On("Closed").Return(false)
190194
require.NoError(t, scraper.start(t.Context(), componenttest.NewNopHost()))
191195
_, err := scraper.scrape(t.Context())
192196
assert.Error(t, err)

0 commit comments

Comments
 (0)