Skip to content

Commit 59142cc

Browse files
author
Alex Boten
committed
[exporterhelper] make enqueue failures available
These metrics were only exporter either via OC or via the prometheus exporter. Fixes #8673 Signed-off-by: Alex Boten <[email protected]>
1 parent d7b49df commit 59142cc

File tree

7 files changed

+107
-95
lines changed

7 files changed

+107
-95
lines changed

exporter/exporterhelper/common.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) {
4040
b.nextSender = nextSender
4141
}
4242

43-
type obsrepSenderFactory func(obsrep *obsExporter) requestSender
43+
type obsrepSenderFactory func(obsrep *ObsReport) requestSender
4444

4545
// baseRequest is a base implementation for the internal.Request.
4646
type baseRequest struct {
@@ -143,7 +143,7 @@ type baseExporter struct {
143143
signal component.DataType
144144

145145
set exporter.CreateSettings
146-
obsrep *obsExporter
146+
obsrep *ObsReport
147147

148148
// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
149149
// The data is handled by each sender in the respective order starting from the queueSender.
@@ -163,7 +163,7 @@ type baseExporter struct {
163163
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler,
164164
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
165165

166-
obsrep, err := newObsExporter(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments)
166+
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
167167
if err != nil {
168168
return nil, err
169169
}
@@ -175,12 +175,12 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
175175
signal: signal,
176176

177177
queueSender: &baseRequestSender{},
178-
obsrepSender: osf(obsrep),
178+
obsrepSender: osf(obsReport),
179179
retrySender: &baseRequestSender{},
180180
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},
181181

182182
set: set,
183-
obsrep: obsrep,
183+
obsrep: obsReport,
184184
}
185185

186186
for _, op := range options {

exporter/exporterhelper/logs.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func NewLogsExporter(
9999
req := newLogsRequest(ctx, ld, pusher)
100100
serr := be.send(req)
101101
if errors.Is(serr, errSendingQueueIsFull) {
102-
be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count()))
102+
be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeLogs, int64(req.Count()))
103103
}
104104
return serr
105105
}, be.consumerOptions...)
@@ -151,7 +151,7 @@ func NewLogsRequestExporter(
151151
r := newRequest(ctx, req)
152152
sErr := be.send(r)
153153
if errors.Is(sErr, errSendingQueueIsFull) {
154-
be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count()))
154+
be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeLogs, int64(r.Count()))
155155
}
156156
return sErr
157157
}, be.consumerOptions...)
@@ -164,10 +164,10 @@ func NewLogsRequestExporter(
164164

165165
type logsExporterWithObservability struct {
166166
baseRequestSender
167-
obsrep *obsExporter
167+
obsrep *ObsReport
168168
}
169169

170-
func newLogsExporterWithObservability(obsrep *obsExporter) requestSender {
170+
func newLogsExporterWithObservability(obsrep *ObsReport) requestSender {
171171
return &logsExporterWithObservability{obsrep: obsrep}
172172
}
173173

exporter/exporterhelper/metrics.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func NewMetricsExporter(
9999
req := newMetricsRequest(ctx, md, pusher)
100100
serr := be.send(req)
101101
if errors.Is(serr, errSendingQueueIsFull) {
102-
be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count()))
102+
be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeMetrics, int64(req.Count()))
103103
}
104104
return serr
105105
}, be.consumerOptions...)
@@ -151,7 +151,7 @@ func NewMetricsRequestExporter(
151151
r := newRequest(ctx, req)
152152
sErr := be.send(r)
153153
if errors.Is(sErr, errSendingQueueIsFull) {
154-
be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count()))
154+
be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeMetrics, int64(r.Count()))
155155
}
156156
return sErr
157157
}, be.consumerOptions...)
@@ -164,10 +164,10 @@ func NewMetricsRequestExporter(
164164

165165
type metricsSenderWithObservability struct {
166166
baseRequestSender
167-
obsrep *obsExporter
167+
obsrep *ObsReport
168168
}
169169

170-
func newMetricsSenderWithObservability(obsrep *obsExporter) requestSender {
170+
func newMetricsSenderWithObservability(obsrep *ObsReport) requestSender {
171171
return &metricsSenderWithObservability{obsrep: obsrep}
172172
}
173173

exporter/exporterhelper/obsexporter.go

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,17 @@ type ObsReport struct {
3434
tracer trace.Tracer
3535
logger *zap.Logger
3636

37-
useOtelForMetrics bool
38-
otelAttrs []attribute.KeyValue
39-
sentSpans metric.Int64Counter
40-
failedToSendSpans metric.Int64Counter
41-
sentMetricPoints metric.Int64Counter
42-
failedToSendMetricPoints metric.Int64Counter
43-
sentLogRecords metric.Int64Counter
44-
failedToSendLogRecords metric.Int64Counter
37+
useOtelForMetrics bool
38+
otelAttrs []attribute.KeyValue
39+
sentSpans metric.Int64Counter
40+
failedToSendSpans metric.Int64Counter
41+
failedToEnqueueSpans metric.Int64Counter
42+
sentMetricPoints metric.Int64Counter
43+
failedToSendMetricPoints metric.Int64Counter
44+
failedToEnqueueMetricPoints metric.Int64Counter
45+
sentLogRecords metric.Int64Counter
46+
failedToSendLogRecords metric.Int64Counter
47+
failedToEnqueueLogRecords metric.Int64Counter
4548
}
4649

4750
// ObsReportSettings are settings for creating an ObsReport.
@@ -96,6 +99,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
9699
metric.WithUnit("1"))
97100
errors = multierr.Append(errors, err)
98101

102+
or.failedToEnqueueSpans, err = meter.Int64Counter(
103+
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueSpansKey,
104+
metric.WithDescription("Number of spans failed to be added to the sending queue."),
105+
metric.WithUnit("1"))
106+
errors = multierr.Append(errors, err)
107+
99108
or.sentMetricPoints, err = meter.Int64Counter(
100109
obsmetrics.ExporterPrefix+obsmetrics.SentMetricPointsKey,
101110
metric.WithDescription("Number of metric points successfully sent to destination."),
@@ -108,6 +117,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
108117
metric.WithUnit("1"))
109118
errors = multierr.Append(errors, err)
110119

120+
or.failedToEnqueueMetricPoints, err = meter.Int64Counter(
121+
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueMetricPointsKey,
122+
metric.WithDescription("Number of metric points failed to be added to the sending queue."),
123+
metric.WithUnit("1"))
124+
errors = multierr.Append(errors, err)
125+
111126
or.sentLogRecords, err = meter.Int64Counter(
112127
obsmetrics.ExporterPrefix+obsmetrics.SentLogRecordsKey,
113128
metric.WithDescription("Number of log record successfully sent to destination."),
@@ -120,6 +135,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
120135
metric.WithUnit("1"))
121136
errors = multierr.Append(errors, err)
122137

138+
or.failedToEnqueueLogRecords, err = meter.Int64Counter(
139+
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueLogRecordsKey,
140+
metric.WithDescription("Number of log records failed to be added to the sending queue."),
141+
metric.WithUnit("1"))
142+
errors = multierr.Append(errors, err)
143+
123144
return errors
124145
}
125146

@@ -252,3 +273,43 @@ func toNumItems(numExportedItems int, err error) (int64, int64) {
252273
}
253274
return int64(numExportedItems), 0
254275
}
276+
277+
func (or *ObsReport) recordEnqueueFailure(ctx context.Context, dataType component.DataType, failed int64) {
278+
if or.useOtelForMetrics {
279+
or.recordEnqueueFailureWithOtel(ctx, dataType, failed)
280+
} else {
281+
or.recordEnqueueFailureWithOC(ctx, dataType, failed)
282+
}
283+
}
284+
285+
func (or *ObsReport) recordEnqueueFailureWithOC(ctx context.Context, dataType component.DataType, failed int64) {
286+
var failedMeasure *stats.Int64Measure
287+
switch dataType {
288+
case component.DataTypeTraces:
289+
failedMeasure = obsmetrics.ExporterFailedToSendSpans
290+
case component.DataTypeMetrics:
291+
failedMeasure = obsmetrics.ExporterFailedToSendMetricPoints
292+
case component.DataTypeLogs:
293+
failedMeasure = obsmetrics.ExporterFailedToSendLogRecords
294+
}
295+
if failed > 0 {
296+
_ = stats.RecordWithTags(
297+
ctx,
298+
or.mutators,
299+
failedMeasure.M(failed))
300+
}
301+
}
302+
303+
func (or *ObsReport) recordEnqueueFailureWithOtel(ctx context.Context, dataType component.DataType, failed int64) {
304+
var enqueueFailedMeasure metric.Int64Counter
305+
switch dataType {
306+
case component.DataTypeTraces:
307+
enqueueFailedMeasure = or.failedToEnqueueSpans
308+
case component.DataTypeMetrics:
309+
enqueueFailedMeasure = or.failedToEnqueueMetricPoints
310+
case component.DataTypeLogs:
311+
enqueueFailedMeasure = or.failedToEnqueueLogRecords
312+
}
313+
314+
enqueueFailedMeasure.Add(ctx, failed, metric.WithAttributes(or.otelAttrs...))
315+
}

exporter/exporterhelper/obsreport.go

Lines changed: 3 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"
55

66
import (
7-
"context"
8-
97
"go.opencensus.io/metric"
108
"go.opencensus.io/metric/metricdata"
119
"go.opencensus.io/metric/metricproducer"
@@ -26,12 +24,9 @@ func init() {
2624
}
2725

2826
type instruments struct {
29-
registry *metric.Registry
30-
queueSize *metric.Int64DerivedGauge
31-
queueCapacity *metric.Int64DerivedGauge
32-
failedToEnqueueTraceSpans *metric.Int64Cumulative
33-
failedToEnqueueMetricPoints *metric.Int64Cumulative
34-
failedToEnqueueLogRecords *metric.Int64Cumulative
27+
registry *metric.Registry
28+
queueSize *metric.Int64DerivedGauge
29+
queueCapacity *metric.Int64DerivedGauge
3530
}
3631

3732
func newInstruments(registry *metric.Registry) *instruments {
@@ -49,67 +44,5 @@ func newInstruments(registry *metric.Registry) *instruments {
4944
metric.WithDescription("Fixed capacity of the retry queue (in batches)"),
5045
metric.WithLabelKeys(obsmetrics.ExporterKey),
5146
metric.WithUnit(metricdata.UnitDimensionless))
52-
53-
insts.failedToEnqueueTraceSpans, _ = registry.AddInt64Cumulative(
54-
obsmetrics.ExporterKey+"/enqueue_failed_spans",
55-
metric.WithDescription("Number of spans failed to be added to the sending queue."),
56-
metric.WithLabelKeys(obsmetrics.ExporterKey),
57-
metric.WithUnit(metricdata.UnitDimensionless))
58-
59-
insts.failedToEnqueueMetricPoints, _ = registry.AddInt64Cumulative(
60-
obsmetrics.ExporterKey+"/enqueue_failed_metric_points",
61-
metric.WithDescription("Number of metric points failed to be added to the sending queue."),
62-
metric.WithLabelKeys(obsmetrics.ExporterKey),
63-
metric.WithUnit(metricdata.UnitDimensionless))
64-
65-
insts.failedToEnqueueLogRecords, _ = registry.AddInt64Cumulative(
66-
obsmetrics.ExporterKey+"/enqueue_failed_log_records",
67-
metric.WithDescription("Number of log records failed to be added to the sending queue."),
68-
metric.WithLabelKeys(obsmetrics.ExporterKey),
69-
metric.WithUnit(metricdata.UnitDimensionless))
70-
7147
return insts
7248
}
73-
74-
// obsExporter is a helper to add observability to an exporter.
75-
type obsExporter struct {
76-
*ObsReport
77-
failedToEnqueueTraceSpansEntry *metric.Int64CumulativeEntry
78-
failedToEnqueueMetricPointsEntry *metric.Int64CumulativeEntry
79-
failedToEnqueueLogRecordsEntry *metric.Int64CumulativeEntry
80-
}
81-
82-
// newObsExporter creates a new observability exporter.
83-
func newObsExporter(cfg ObsReportSettings, insts *instruments) (*obsExporter, error) {
84-
labelValue := metricdata.NewLabelValue(cfg.ExporterID.String())
85-
failedToEnqueueTraceSpansEntry, _ := insts.failedToEnqueueTraceSpans.GetEntry(labelValue)
86-
failedToEnqueueMetricPointsEntry, _ := insts.failedToEnqueueMetricPoints.GetEntry(labelValue)
87-
failedToEnqueueLogRecordsEntry, _ := insts.failedToEnqueueLogRecords.GetEntry(labelValue)
88-
89-
exp, err := NewObsReport(cfg)
90-
if err != nil {
91-
return nil, err
92-
}
93-
94-
return &obsExporter{
95-
ObsReport: exp,
96-
failedToEnqueueTraceSpansEntry: failedToEnqueueTraceSpansEntry,
97-
failedToEnqueueMetricPointsEntry: failedToEnqueueMetricPointsEntry,
98-
failedToEnqueueLogRecordsEntry: failedToEnqueueLogRecordsEntry,
99-
}, nil
100-
}
101-
102-
// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
103-
func (eor *obsExporter) recordTracesEnqueueFailure(_ context.Context, numSpans int64) {
104-
eor.failedToEnqueueTraceSpansEntry.Inc(numSpans)
105-
}
106-
107-
// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
108-
func (eor *obsExporter) recordMetricsEnqueueFailure(_ context.Context, numMetricPoints int64) {
109-
eor.failedToEnqueueMetricPointsEntry.Inc(numMetricPoints)
110-
}
111-
112-
// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
113-
func (eor *obsExporter) recordLogsEnqueueFailure(_ context.Context, numLogRecords int64) {
114-
eor.failedToEnqueueLogRecordsEntry.Inc(numLogRecords)
115-
}

exporter/exporterhelper/traces.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func NewTracesExporter(
9999
req := newTracesRequest(ctx, td, pusher)
100100
serr := be.send(req)
101101
if errors.Is(serr, errSendingQueueIsFull) {
102-
be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count()))
102+
be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeTraces, int64(req.Count()))
103103
}
104104
return serr
105105
}, be.consumerOptions...)
@@ -151,7 +151,7 @@ func NewTracesRequestExporter(
151151
r := newRequest(ctx, req)
152152
sErr := be.send(r)
153153
if errors.Is(sErr, errSendingQueueIsFull) {
154-
be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count()))
154+
be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeTraces, int64(r.Count()))
155155
}
156156
return sErr
157157
}, be.consumerOptions...)
@@ -164,10 +164,10 @@ func NewTracesRequestExporter(
164164

165165
type tracesExporterWithObservability struct {
166166
baseRequestSender
167-
obsrep *obsExporter
167+
obsrep *ObsReport
168168
}
169169

170-
func newTracesExporterWithObservability(obsrep *obsExporter) requestSender {
170+
func newTracesExporterWithObservability(obsrep *ObsReport) requestSender {
171171
return &tracesExporterWithObservability{obsrep: obsrep}
172172
}
173173

internal/obsreportconfig/obsmetrics/obs_exporter.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,22 @@ const (
1616
SentSpansKey = "sent_spans"
1717
// FailedToSendSpansKey used to track spans that failed to be sent by exporters.
1818
FailedToSendSpansKey = "send_failed_spans"
19+
// FailedToEnqueueSpansKey used to track spans that failed to be enqueued by exporters.
20+
FailedToEnqueueSpansKey = "enqueue_failed_spans"
1921

2022
// SentMetricPointsKey used to track metric points sent by exporters.
2123
SentMetricPointsKey = "sent_metric_points"
2224
// FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters.
2325
FailedToSendMetricPointsKey = "send_failed_metric_points"
26+
// FailedToEnqueueMetricPointsKey used to track metric points that failed to be enqueued by exporters.
27+
FailedToEnqueueMetricPointsKey = "enqueue_failed_metric_points"
2428

2529
// SentLogRecordsKey used to track logs sent by exporters.
2630
SentLogRecordsKey = "sent_log_records"
2731
// FailedToSendLogRecordsKey used to track logs that failed to be sent by exporters.
2832
FailedToSendLogRecordsKey = "send_failed_log_records"
33+
// FailedToEnqueueLogRecordsKey used to track logs that failed to be enqueued by exporters.
34+
FailedToEnqueueLogRecordsKey = "enqueue_failed_log_records"
2935
)
3036

3137
var (
@@ -49,6 +55,10 @@ var (
4955
ExporterPrefix+FailedToSendSpansKey,
5056
"Number of spans in failed attempts to send to destination.",
5157
stats.UnitDimensionless)
58+
ExporterFailedToEnqueueSpans = stats.Int64(
59+
ExporterPrefix+FailedToEnqueueSpansKey,
60+
"Number of spans failed to be added to the sending queue.",
61+
stats.UnitDimensionless)
5262
ExporterSentMetricPoints = stats.Int64(
5363
ExporterPrefix+SentMetricPointsKey,
5464
"Number of metric points successfully sent to destination.",
@@ -57,6 +67,10 @@ var (
5767
ExporterPrefix+FailedToSendMetricPointsKey,
5868
"Number of metric points in failed attempts to send to destination.",
5969
stats.UnitDimensionless)
70+
ExporterFailedToEnqueueMetricPoints = stats.Int64(
71+
ExporterPrefix+FailedToEnqueueMetricPointsKey,
72+
"Number of metric points failed to be added to the sending queue.",
73+
stats.UnitDimensionless)
6074
ExporterSentLogRecords = stats.Int64(
6175
ExporterPrefix+SentLogRecordsKey,
6276
"Number of log record successfully sent to destination.",
@@ -65,4 +79,8 @@ var (
6579
ExporterPrefix+FailedToSendLogRecordsKey,
6680
"Number of log records in failed attempts to send to destination.",
6781
stats.UnitDimensionless)
82+
ExporterFailedToEnqueueLogRecords = stats.Int64(
83+
ExporterPrefix+FailedToEnqueueLogRecordsKey,
84+
"Number of log records failed to be added to the sending queue.",
85+
stats.UnitDimensionless)
6886
)

0 commit comments

Comments
 (0)