Skip to content

Commit e6c14a5

Browse files
authored
[exporter/elasticsearch] Log and drop invalid metrics instead of returning error to avoid upstream retries (open-telemetry#35740)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Log metrics validation error instead of returning to avoid upstream retries <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent fb114a6 commit e6c14a5

File tree

3 files changed

+46
-12
lines changed

3 files changed

+46
-12
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Log and drop invalid metrics instead of returning error to avoid upstream retries
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35740]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/exporter.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.opentelemetry.io/collector/pdata/plog"
1919
"go.opentelemetry.io/collector/pdata/pmetric"
2020
"go.opentelemetry.io/collector/pdata/ptrace"
21+
"go.uber.org/zap"
2122

2223
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
2324
)
@@ -187,7 +188,10 @@ func (e *elasticsearchExporter) pushMetricsData(
187188
}
188189
defer session.End()
189190

190-
var errs []error
191+
var (
192+
validationErrs []error // log instead of returning these so that upstream does not retry
193+
errs []error
194+
)
191195
resourceMetrics := metrics.ResourceMetrics()
192196
for i := 0; i < resourceMetrics.Len(); i++ {
193197
resourceMetric := resourceMetrics.At(i)
@@ -224,7 +228,7 @@ func (e *elasticsearchExporter) pushMetricsData(
224228
for l := 0; l < dps.Len(); l++ {
225229
dp := dps.At(l)
226230
if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil {
227-
errs = append(errs, err)
231+
validationErrs = append(validationErrs, err)
228232
continue
229233
}
230234
}
@@ -233,33 +237,33 @@ func (e *elasticsearchExporter) pushMetricsData(
233237
for l := 0; l < dps.Len(); l++ {
234238
dp := dps.At(l)
235239
if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil {
236-
errs = append(errs, err)
240+
validationErrs = append(validationErrs, err)
237241
continue
238242
}
239243
}
240244
case pmetric.MetricTypeExponentialHistogram:
241245
if metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
242-
errs = append(errs, fmt.Errorf("dropping cumulative temporality exponential histogram %q", metric.Name()))
246+
validationErrs = append(validationErrs, fmt.Errorf("dropping cumulative temporality exponential histogram %q", metric.Name()))
243247
continue
244248
}
245249
dps := metric.ExponentialHistogram().DataPoints()
246250
for l := 0; l < dps.Len(); l++ {
247251
dp := dps.At(l)
248252
if err := upsertDataPoint(newExponentialHistogramDataPoint(dp)); err != nil {
249-
errs = append(errs, err)
253+
validationErrs = append(validationErrs, err)
250254
continue
251255
}
252256
}
253257
case pmetric.MetricTypeHistogram:
254258
if metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
255-
errs = append(errs, fmt.Errorf("dropping cumulative temporality histogram %q", metric.Name()))
259+
validationErrs = append(validationErrs, fmt.Errorf("dropping cumulative temporality histogram %q", metric.Name()))
256260
continue
257261
}
258262
dps := metric.Histogram().DataPoints()
259263
for l := 0; l < dps.Len(); l++ {
260264
dp := dps.At(l)
261265
if err := upsertDataPoint(newHistogramDataPoint(dp)); err != nil {
262-
errs = append(errs, err)
266+
validationErrs = append(validationErrs, err)
263267
continue
264268
}
265269
}
@@ -268,14 +272,18 @@ func (e *elasticsearchExporter) pushMetricsData(
268272
for l := 0; l < dps.Len(); l++ {
269273
dp := dps.At(l)
270274
if err := upsertDataPoint(newSummaryDataPoint(dp)); err != nil {
271-
errs = append(errs, err)
275+
validationErrs = append(validationErrs, err)
272276
continue
273277
}
274278
}
275279
}
276280
}
277281
}
278282

283+
if len(validationErrs) > 0 {
284+
e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...)))
285+
}
286+
279287
for fIndex, docs := range resourceDocs {
280288
for _, doc := range docs {
281289
var (

exporter/elasticsearchexporter/exporter_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,7 @@ func TestExporterMetrics(t *testing.T) {
890890
fooDp.BucketCounts().FromRaw([]uint64{1, 2, 3, 4})
891891

892892
err := exporter.ConsumeMetrics(context.Background(), metrics)
893-
assert.ErrorContains(t, err, "dropping cumulative temporality histogram \"metric.foo\"")
893+
assert.NoError(t, err)
894894
})
895895

896896
t.Run("publish exponential histogram cumulative temporality", func(t *testing.T) {
@@ -921,7 +921,7 @@ func TestExporterMetrics(t *testing.T) {
921921
fooDp.Negative().BucketCounts().FromRaw([]uint64{1, 0, 0, 1})
922922

923923
err := exporter.ConsumeMetrics(context.Background(), metrics)
924-
assert.ErrorContains(t, err, "dropping cumulative temporality exponential histogram \"metric.foo\"")
924+
assert.NoError(t, err)
925925
})
926926

927927
t.Run("publish only valid data points", func(t *testing.T) {
@@ -960,8 +960,7 @@ func TestExporterMetrics(t *testing.T) {
960960
barOtherDp.SetDoubleValue(1.0)
961961

962962
err := exporter.ConsumeMetrics(context.Background(), metrics)
963-
require.ErrorContains(t, err, "invalid histogram data point")
964-
require.ErrorContains(t, err, "invalid number data point")
963+
assert.NoError(t, err)
965964

966965
rec.WaitItems(2)
967966

0 commit comments

Comments
 (0)