Skip to content

Commit a3ff02b

Browse files
Update receiverhelper for requests that failed to be received (#12802)
#### Description This is an ongoing work to add : - In the endOp function of [ObsReport](receiver/receiverhelper/obsreport.go), errors are classified as either: - Downstream errors (refused) - when internal.IsDownstreamError(err) returns true - Internal errors - when the error is not a downstream error - New metrics otelcol_receiver_internal_errors_spans, otelcol_receiver_internal_errors_metric_points and otelcol_receiver_internal_errors_log_records in case of inetrnal errors returned above - Support of otelcol_receiver_requests metric that will determines the outcome of the request: - "success" if there's no error - "refused" if there's a downstream error - "failure" if there's an internal error #### Link to tracking issue Fixes # #12207 #### Testing Only added the integration tests. #### Documentation See updated [receiver/receiverhelper/documentation.md](receiver/receiverhelper/documentation.md) --------- Signed-off-by: Andreas Gkizas <[email protected]> Co-authored-by: Jade Guiton <[email protected]>
1 parent 2e19689 commit a3ff02b

File tree

13 files changed

+984
-255
lines changed

13 files changed

+984
-255
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: receiverhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: New feature flag to make receiverhelper distinguish internal vs. downstream errors using new `otelcol_receiver_failed_x` and `otelcol_receiver_requests` metrics
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12207, 12802]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
This is a breaking change for the semantics of the otelcol_receiver_refused_metric_points, otelcol_receiver_refused_log_records and otelcol_receiver_refused_spans metrics.
20+
These new metrics and semantics are enabled through the `receiverhelper.newReceiverMetrics` feature gate.
21+
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/otlpreceiver/otlp_test.go

Lines changed: 245 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@ import (
4444
"go.opentelemetry.io/collector/internal/testutil"
4545
"go.opentelemetry.io/collector/pdata/plog"
4646
"go.opentelemetry.io/collector/pdata/pmetric"
47+
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
4748
"go.opentelemetry.io/collector/pdata/pprofile"
4849
"go.opentelemetry.io/collector/pdata/ptrace"
4950
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
5051
"go.opentelemetry.io/collector/pdata/testdata"
5152
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metadata"
53+
"go.opentelemetry.io/collector/receiver/receiverhelper"
5254
"go.opentelemetry.io/collector/receiver/receivertest"
5355
)
5456

@@ -118,8 +120,8 @@ func TestJsonHttp(t *testing.T) {
118120
name: "Retryable GRPCError",
119121
encoding: "",
120122
contentType: "application/json",
121-
err: status.New(codes.Unavailable, "").Err(),
122-
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""},
123+
err: status.New(codes.Unavailable, "Service Unavailable").Err(),
124+
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "Service Unavailable"},
123125
expectedStatusCode: http.StatusServiceUnavailable,
124126
},
125127
}
@@ -145,7 +147,8 @@ func TestJsonHttp(t *testing.T) {
145147
errStatus := &spb.Status{}
146148
require.NoError(t, json.Unmarshal(respBytes, errStatus))
147149
if s, ok := status.FromError(tt.err); ok {
148-
assert.True(t, proto.Equal(errStatus, s.Proto()))
150+
assert.Equal(t, s.Proto().Code, errStatus.Code)
151+
assert.Equal(t, s.Proto().Message, errStatus.Message)
149152
} else {
150153
fmt.Println(errStatus)
151154
assert.True(t, proto.Equal(errStatus, tt.expectedStatus))
@@ -365,15 +368,15 @@ func TestProtoHttp(t *testing.T) {
365368
{
366369
name: "Permanent GRPCError",
367370
encoding: "",
368-
err: status.New(codes.InvalidArgument, "").Err(),
369-
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""},
371+
err: status.New(codes.InvalidArgument, "Bad Request").Err(),
372+
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Bad Request"},
370373
expectedStatusCode: http.StatusBadRequest,
371374
},
372375
{
373376
name: "Retryable GRPCError",
374377
encoding: "",
375-
err: status.New(codes.Unavailable, "").Err(),
376-
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""},
378+
err: status.New(codes.Unavailable, "Service Unavailable").Err(),
379+
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "Service Unavailable"},
377380
expectedStatusCode: http.StatusServiceUnavailable,
378381
},
379382
}
@@ -553,6 +556,45 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) {
553556
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
554557
}
555558

559+
// TestOTLPReceiverGRPCMetricsIngestTest checks that the metrics receiver
560+
// is returning the proper response (return and metrics) when the next consumer
561+
// in the pipeline reports error.
562+
func TestOTLPReceiverGRPCMetricsIngestTest(t *testing.T) {
563+
// Get a new available port
564+
addr := testutil.GetAvailableLocalAddress(t)
565+
566+
// Create a sink
567+
sink := &errOrSinkConsumer{MetricsSink: new(consumertest.MetricsSink)}
568+
569+
// Create a telemetry instance
570+
tt := componenttest.NewTelemetry()
571+
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
572+
// Create telemetry settings
573+
settings := tt.NewTelemetrySettings()
574+
575+
recv := newGRPCReceiver(t, settings, addr, sink)
576+
require.NotNil(t, recv)
577+
require.NoError(t, recv.Start(context.Background(), componenttest.NewNopHost()))
578+
t.Cleanup(func() { require.NoError(t, recv.Shutdown(context.Background())) })
579+
580+
cc, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
581+
require.NoError(t, err)
582+
defer func() {
583+
assert.NoError(t, cc.Close())
584+
}()
585+
// Set up the error case
586+
sink.SetConsumeError(errors.New("consumer error"))
587+
588+
md := testdata.GenerateMetrics(1)
589+
_, err = pmetricotlp.NewGRPCClient(cc).Export(context.Background(), pmetricotlp.NewExportRequestFromMetrics(md))
590+
errStatus, ok := status.FromError(err)
591+
require.True(t, ok)
592+
assert.Equal(t, codes.Unavailable, errStatus.Code())
593+
594+
// Assert receiver metrics including receiver_requests
595+
assertReceiverMetrics(t, tt, otlpReceiverID, "grpc", 0, 2)
596+
}
597+
556598
// TestOTLPReceiverGRPCTracesIngestTest checks that the gRPC trace receiver
557599
// is returning the proper response (return and metrics) when the next consumer
558600
// in the pipeline reports error. The test changes the responses returned by the
@@ -1262,8 +1304,41 @@ func (esc *errOrSinkConsumer) checkData(t *testing.T, data any, dataLen int) {
12621304
}
12631305
}
12641306

1265-
func assertReceiverTraces(t *testing.T, tt *componenttest.Telemetry, id component.ID, transport string, accepted, refused int64) {
1266-
got, err := tt.GetMetric("otelcol_receiver_accepted_spans")
1307+
func assertReceiverTraces(t *testing.T, tt *componenttest.Telemetry, id component.ID, transport string, accepted, rejected int64) {
1308+
var refused, failed int64
1309+
var outcome string
1310+
gateEnabled := receiverhelper.NewReceiverMetricsGate.IsEnabled()
1311+
// The errors in the OTLP tests are not downstream, so they should be "failed" when the gate is enabled.
1312+
if gateEnabled {
1313+
failed = rejected
1314+
outcome = "failure"
1315+
} else {
1316+
// When the gate is disabled, all errors are "refused".
1317+
refused = rejected
1318+
}
1319+
1320+
got, err := tt.GetMetric("otelcol_receiver_failed_spans")
1321+
require.NoError(t, err)
1322+
metricdatatest.AssertEqual(t,
1323+
metricdata.Metrics{
1324+
Name: "otelcol_receiver_failed_spans",
1325+
Description: "The number of spans that failed to be processed by the receiver due to internal errors. [alpha]",
1326+
Unit: "{spans}",
1327+
Data: metricdata.Sum[int64]{
1328+
Temporality: metricdata.CumulativeTemporality,
1329+
IsMonotonic: true,
1330+
DataPoints: []metricdata.DataPoint[int64]{
1331+
{
1332+
Attributes: attribute.NewSet(
1333+
attribute.String("receiver", id.String()),
1334+
attribute.String("transport", transport)),
1335+
Value: failed,
1336+
},
1337+
},
1338+
},
1339+
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
1340+
1341+
got, err = tt.GetMetric("otelcol_receiver_accepted_spans")
12671342
require.NoError(t, err)
12681343
metricdatatest.AssertEqual(t,
12691344
metricdata.Metrics{
@@ -1304,4 +1379,165 @@ func assertReceiverTraces(t *testing.T, tt *componenttest.Telemetry, id componen
13041379
},
13051380
},
13061381
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
1382+
1383+
// Assert receiver_requests metric
1384+
if gateEnabled {
1385+
got, err := tt.GetMetric("otelcol_receiver_requests")
1386+
require.NoError(t, err)
1387+
1388+
// Calculate expected requests based on accepted and refused counts
1389+
var expectedRequests []metricdata.DataPoint[int64]
1390+
if accepted > 0 {
1391+
expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{
1392+
Attributes: attribute.NewSet(
1393+
attribute.String("receiver", id.String()),
1394+
attribute.String("transport", transport),
1395+
attribute.String("outcome", "success")),
1396+
Value: accepted,
1397+
})
1398+
}
1399+
if rejected > 0 {
1400+
expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{
1401+
Attributes: attribute.NewSet(
1402+
attribute.String("receiver", id.String()),
1403+
attribute.String("transport", transport),
1404+
attribute.String("outcome", outcome)),
1405+
Value: rejected,
1406+
})
1407+
}
1408+
1409+
metricdatatest.AssertEqual(t,
1410+
metricdata.Metrics{
1411+
Name: "otelcol_receiver_requests",
1412+
Description: "The number of requests performed.",
1413+
Unit: "{requests}",
1414+
Data: metricdata.Sum[int64]{
1415+
Temporality: metricdata.CumulativeTemporality,
1416+
IsMonotonic: true,
1417+
DataPoints: expectedRequests,
1418+
},
1419+
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
1420+
} else {
1421+
_, err := tt.GetMetric("otelcol_receiver_requests")
1422+
require.Error(t, err)
1423+
}
1424+
}
1425+
1426+
func assertReceiverMetrics(t *testing.T, tt *componenttest.Telemetry, id component.ID, transport string, accepted, rejected int64) {
1427+
var refused, failed int64
1428+
var outcome string
1429+
gateEnabled := receiverhelper.NewReceiverMetricsGate.IsEnabled()
1430+
// The error used in the metrics test is not downstream.
1431+
if gateEnabled {
1432+
failed = rejected
1433+
outcome = "failure"
1434+
} else {
1435+
// When the gate is disabled, all errors are "refused".
1436+
refused = rejected
1437+
}
1438+
1439+
got, err := tt.GetMetric("otelcol_receiver_failed_metric_points")
1440+
require.NoError(t, err)
1441+
metricdatatest.AssertEqual(t,
1442+
metricdata.Metrics{
1443+
Name: "otelcol_receiver_failed_metric_points",
1444+
Description: "The number of metric points that failed to be processed by the receiver due to internal errors. [alpha]",
1445+
Unit: "{datapoints}",
1446+
Data: metricdata.Sum[int64]{
1447+
Temporality: metricdata.CumulativeTemporality,
1448+
IsMonotonic: true,
1449+
DataPoints: []metricdata.DataPoint[int64]{
1450+
{
1451+
Attributes: attribute.NewSet(
1452+
attribute.String("receiver", id.String()),
1453+
attribute.String("transport", transport)),
1454+
Value: failed,
1455+
},
1456+
},
1457+
},
1458+
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
1459+
1460+
got, err = tt.GetMetric("otelcol_receiver_accepted_metric_points")
1461+
require.NoError(t, err)
1462+
metricdatatest.AssertEqual(t,
1463+
metricdata.Metrics{
1464+
Name: "otelcol_receiver_accepted_metric_points",
1465+
Description: "Number of metric points successfully pushed into the pipeline. [alpha]",
1466+
Unit: "{datapoints}",
1467+
Data: metricdata.Sum[int64]{
1468+
Temporality: metricdata.CumulativeTemporality,
1469+
IsMonotonic: true,
1470+
DataPoints: []metricdata.DataPoint[int64]{
1471+
{
1472+
Attributes: attribute.NewSet(
1473+
attribute.String("receiver", id.String()),
1474+
attribute.String("transport", transport)),
1475+
Value: accepted,
1476+
},
1477+
},
1478+
},
1479+
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
1480+
1481+
got, err = tt.GetMetric("otelcol_receiver_refused_metric_points")
1482+
require.NoError(t, err)
1483+
metricdatatest.AssertEqual(t,
1484+
metricdata.Metrics{
1485+
Name: "otelcol_receiver_refused_metric_points",
1486+
Description: "Number of metric points that could not be pushed into the pipeline. [alpha]",
1487+
Unit: "{datapoints}",
1488+
Data: metricdata.Sum[int64]{
1489+
Temporality: metricdata.CumulativeTemporality,
1490+
IsMonotonic: true,
1491+
DataPoints: []metricdata.DataPoint[int64]{
1492+
{
1493+
Attributes: attribute.NewSet(
1494+
attribute.String("receiver", id.String()),
1495+
attribute.String("transport", transport)),
1496+
Value: refused,
1497+
},
1498+
},
1499+
},
1500+
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
1501+
1502+
// Assert receiver_requests metric
1503+
if gateEnabled {
1504+
got, err := tt.GetMetric("otelcol_receiver_requests")
1505+
require.NoError(t, err)
1506+
1507+
// Calculate expected requests based on accepted and refused counts
1508+
var expectedRequests []metricdata.DataPoint[int64]
1509+
if accepted > 0 {
1510+
expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{
1511+
Attributes: attribute.NewSet(
1512+
attribute.String("receiver", id.String()),
1513+
attribute.String("transport", transport),
1514+
attribute.String("outcome", "success")),
1515+
Value: accepted,
1516+
})
1517+
}
1518+
if rejected > 0 {
1519+
expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{
1520+
Attributes: attribute.NewSet(
1521+
attribute.String("receiver", id.String()),
1522+
attribute.String("transport", transport),
1523+
attribute.String("outcome", outcome)),
1524+
Value: 1, // One request failed
1525+
})
1526+
}
1527+
1528+
metricdatatest.AssertEqual(t,
1529+
metricdata.Metrics{
1530+
Name: "otelcol_receiver_requests",
1531+
Description: "The number of requests performed.",
1532+
Unit: "{requests}",
1533+
Data: metricdata.Sum[int64]{
1534+
Temporality: metricdata.CumulativeTemporality,
1535+
IsMonotonic: true,
1536+
DataPoints: expectedRequests,
1537+
},
1538+
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
1539+
} else {
1540+
_, err := tt.GetMetric("otelcol_receiver_requests")
1541+
require.Error(t, err)
1542+
}
13071543
}

receiver/otlpreceiver/otlphttp_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"go.opentelemetry.io/collector/component/componenttest"
2424
"go.opentelemetry.io/collector/internal/testutil"
2525
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
26+
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"
2627
)
2728

2829
func TestHttpRetryAfter(t *testing.T) {
@@ -126,7 +127,10 @@ func TestHttpRetryAfter(t *testing.T) {
126127
} else {
127128
errStatus := &spb.Status{}
128129
require.NoError(t, proto.Unmarshal(respBytes, errStatus))
129-
s, ok := status.FromError(tt.err)
130+
// The HTTP receiver transforms errors through GetStatusFromError
131+
// We need to get the expected transformed error, not the original
132+
expectedErr := errors.GetStatusFromError(tt.err)
133+
s, ok := status.FromError(expectedErr)
130134
require.True(t, ok)
131135
assert.True(t, proto.Equal(errStatus, s.Proto()))
132136
}

0 commit comments

Comments
 (0)