-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Update receiverhelper for requests that failed to be received #12802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 89 commits
ca361bb
6ee5ec8
83b9462
b926b14
1803e33
d729b2c
81c6f86
140fc8f
a6db9e1
45ab725
85bcabb
8a2116c
6f013a7
61e3468
b9e788d
0019011
d4c452a
143b04c
1c151b4
cd2bb19
0194ac5
ad79665
29e209d
2928c47
ed48254
1053f75
fcc1943
5b0be26
9294a16
156f422
0bb9732
cb2796a
fdb2762
d20d19d
165b076
79cc00c
e475745
3decea5
4846daf
0d3e4b1
666c5b2
a6f7dc7
2c6b080
53992b5
843ef40
0018f77
c3439a8
cd6e640
75e3e7f
d9eebb1
515e4d5
0c4b977
eb93ce5
b76a51d
3ab59d3
d347bd1
8a4a26c
44b193e
579e532
250f1ba
0a38936
36acb84
08fb90d
702d48a
d7f9533
c82a466
a92c7a6
535d2ba
9b7c3f5
7f5d19f
b9543ac
52b302d
44c898c
1f2a196
6583a43
9a6cf12
3267177
041e3f6
3601749
638370f
73fd278
909a081
f0297ff
25ea91a
b3b8b87
dd3df23
89d7bc4
de8ed1b
24e4281
b77091f
89c07dd
7fc9765
a1c0105
8199a4b
ec402d5
4fd8bc6
fac2c12
a8a0b2d
5637ee6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) | ||
component: receiverhelper | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Update receiverhelper StartXOp and EndXOp methods to emit otelcol_receiver_refused_x and otelcol_receiver_failed_x in order to be able to distinguish internal vs downstream errors | ||
|
||
# One or more tracking issues or pull requests related to the change | ||
issues: [12207] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | | ||
The specified feature is enable through feature-gate flag. Enable with command: | ||
otelcol --config=config.yaml --feature-gates=+receiverhelper.newReceiverMetrics | ||
|
||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,11 +44,13 @@ import ( | |
"go.opentelemetry.io/collector/internal/testutil" | ||
"go.opentelemetry.io/collector/pdata/plog" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" | ||
"go.opentelemetry.io/collector/pdata/pprofile" | ||
"go.opentelemetry.io/collector/pdata/ptrace" | ||
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" | ||
"go.opentelemetry.io/collector/pdata/testdata" | ||
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metadata" | ||
"go.opentelemetry.io/collector/receiver/receiverhelper" | ||
"go.opentelemetry.io/collector/receiver/receivertest" | ||
) | ||
|
||
|
@@ -118,8 +120,8 @@ func TestJsonHttp(t *testing.T) { | |
name: "Retryable GRPCError", | ||
encoding: "", | ||
contentType: "application/json", | ||
err: status.New(codes.Unavailable, "").Err(), | ||
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""}, | ||
err: status.New(codes.Unavailable, "Service Unavailable").Err(), | ||
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "Service Unavailable"}, | ||
expectedStatusCode: http.StatusServiceUnavailable, | ||
}, | ||
} | ||
|
@@ -145,7 +147,8 @@ func TestJsonHttp(t *testing.T) { | |
errStatus := &spb.Status{} | ||
require.NoError(t, json.Unmarshal(respBytes, errStatus)) | ||
if s, ok := status.FromError(tt.err); ok { | ||
assert.True(t, proto.Equal(errStatus, s.Proto())) | ||
assert.Equal(t, s.Proto().Code, errStatus.Code) | ||
assert.Equal(t, s.Proto().Message, errStatus.Message) | ||
} else { | ||
fmt.Println(errStatus) | ||
assert.True(t, proto.Equal(errStatus, tt.expectedStatus)) | ||
|
@@ -365,15 +368,15 @@ func TestProtoHttp(t *testing.T) { | |
{ | ||
name: "Permanent GRPCError", | ||
encoding: "", | ||
err: status.New(codes.InvalidArgument, "").Err(), | ||
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""}, | ||
err: status.New(codes.InvalidArgument, "Bad Request").Err(), | ||
expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Bad Request"}, | ||
expectedStatusCode: http.StatusBadRequest, | ||
}, | ||
{ | ||
name: "Retryable GRPCError", | ||
encoding: "", | ||
err: status.New(codes.Unavailable, "").Err(), | ||
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""}, | ||
err: status.New(codes.Unavailable, "Service Unavailable").Err(), | ||
expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "Service Unavailable"}, | ||
expectedStatusCode: http.StatusServiceUnavailable, | ||
}, | ||
} | ||
|
@@ -553,6 +556,45 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) { | |
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) | ||
} | ||
|
||
// TestOTLPReceiverGRPCMetricsIngestTest checks that the metrics receiver | ||
// is returning the proper response (return and metrics) when the next consumer | ||
// in the pipeline reports error. | ||
func TestOTLPReceiverGRPCMetricsIngestTest(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was there a specific reason to add this new test for metrics? Do you expect the behavior to be different from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really, just same logic to catch also any changes in metrics. Let me know if you want me to remove it |
||
// Get a new available port | ||
addr := testutil.GetAvailableLocalAddress(t) | ||
|
||
// Create a sink | ||
sink := &errOrSinkConsumer{MetricsSink: new(consumertest.MetricsSink)} | ||
|
||
// Create a telemetry instance | ||
tt := componenttest.NewTelemetry() | ||
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) | ||
// Create telemetry settings | ||
settings := tt.NewTelemetrySettings() | ||
|
||
recv := newGRPCReceiver(t, settings, addr, sink) | ||
require.NotNil(t, recv) | ||
require.NoError(t, recv.Start(context.Background(), componenttest.NewNopHost())) | ||
t.Cleanup(func() { require.NoError(t, recv.Shutdown(context.Background())) }) | ||
|
||
cc, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
require.NoError(t, err) | ||
defer func() { | ||
assert.NoError(t, cc.Close()) | ||
}() | ||
// Set up the error case | ||
sink.SetConsumeError(errors.New("consumer error")) | ||
|
||
md := testdata.GenerateMetrics(1) | ||
_, err = pmetricotlp.NewGRPCClient(cc).Export(context.Background(), pmetricotlp.NewExportRequestFromMetrics(md)) | ||
errStatus, ok := status.FromError(err) | ||
require.True(t, ok) | ||
assert.Equal(t, codes.Unavailable, errStatus.Code()) | ||
|
||
// Assert receiver metrics including receiver_requests | ||
assertReceiverMetrics(t, tt, otlpReceiverID, "grpc", 0, 2) | ||
} | ||
|
||
// TestOTLPReceiverGRPCTracesIngestTest checks that the gRPC trace receiver | ||
// is returning the proper response (return and metrics) when the next consumer | ||
// in the pipeline reports error. The test changes the responses returned by the | ||
|
@@ -1262,8 +1304,40 @@ func (esc *errOrSinkConsumer) checkData(t *testing.T, data any, dataLen int) { | |
} | ||
} | ||
|
||
func assertReceiverTraces(t *testing.T, tt *componenttest.Telemetry, id component.ID, transport string, accepted, refused int64) { | ||
got, err := tt.GetMetric("otelcol_receiver_accepted_spans") | ||
func assertReceiverTraces(t *testing.T, tt *componenttest.Telemetry, id component.ID, transport string, accepted, rejected int64) { | ||
var refused, failed int64 | ||
var outcome string | ||
gateEnabled := receiverhelper.NewReceiverMetricsGate.IsEnabled() | ||
if gateEnabled { | ||
refused = rejected | ||
outcome = "refused" | ||
} else { | ||
failed = rejected | ||
outcome = "failure" | ||
} | ||
|
||
got, err := tt.GetMetric("otelcol_receiver_failed_spans") | ||
require.NoError(t, err) | ||
metricdatatest.AssertEqual(t, | ||
metricdata.Metrics{ | ||
Name: "otelcol_receiver_failed_spans", | ||
Description: "The number of spans that failed to be processed by the receiver due to internal errors.", | ||
Unit: "{spans}", | ||
Data: metricdata.Sum[int64]{ | ||
Temporality: metricdata.CumulativeTemporality, | ||
IsMonotonic: true, | ||
DataPoints: []metricdata.DataPoint[int64]{ | ||
{ | ||
Attributes: attribute.NewSet( | ||
attribute.String("receiver", id.String()), | ||
attribute.String("transport", transport)), | ||
Value: failed, | ||
}, | ||
}, | ||
}, | ||
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) | ||
|
||
got, err = tt.GetMetric("otelcol_receiver_accepted_spans") | ||
require.NoError(t, err) | ||
metricdatatest.AssertEqual(t, | ||
metricdata.Metrics{ | ||
|
@@ -1304,4 +1378,164 @@ func assertReceiverTraces(t *testing.T, tt *componenttest.Telemetry, id componen | |
}, | ||
}, | ||
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) | ||
|
||
// Assert receiver_requests metric | ||
if gateEnabled { | ||
got, err := tt.GetMetric("otelcol_receiver_requests") | ||
require.NoError(t, err) | ||
|
||
// Calculate expected requests based on accepted and refused counts | ||
var expectedRequests []metricdata.DataPoint[int64] | ||
if accepted > 0 { | ||
expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ | ||
Attributes: attribute.NewSet( | ||
attribute.String("receiver", id.String()), | ||
attribute.String("transport", transport), | ||
attribute.String("outcome", "success")), | ||
Value: accepted, | ||
}) | ||
} | ||
if rejected > 0 { | ||
expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ | ||
Attributes: attribute.NewSet( | ||
attribute.String("receiver", id.String()), | ||
attribute.String("transport", transport), | ||
attribute.String("outcome", outcome)), | ||
Value: rejected, | ||
}) | ||
} | ||
|
||
metricdatatest.AssertEqual(t, | ||
metricdata.Metrics{ | ||
Name: "otelcol_receiver_requests", | ||
Description: "The number of requests performed.", | ||
Unit: "{requests}", | ||
Data: metricdata.Sum[int64]{ | ||
Temporality: metricdata.CumulativeTemporality, | ||
IsMonotonic: true, | ||
DataPoints: expectedRequests, | ||
}, | ||
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) | ||
} else { | ||
_, err := tt.GetMetric("otelcol_receiver_requests") | ||
require.Error(t, err) | ||
} | ||
} | ||
|
||
func assertReceiverMetrics(t *testing.T, tt *componenttest.Telemetry, id component.ID, transport string, accepted, rejected int64) { | ||
var refused, failed int64 | ||
var outcome string | ||
gateEnabled := receiverhelper.NewReceiverMetricsGate.IsEnabled() | ||
if gateEnabled && consumererror.IsDownstream(errors.New("consumer error")) { | ||
refused = rejected | ||
outcome = "refused" | ||
} else { | ||
failed = rejected | ||
outcome = "failure" | ||
} | ||
|
||
got, err := tt.GetMetric("otelcol_receiver_failed_metric_points") | ||
require.NoError(t, err) | ||
metricdatatest.AssertEqual(t, | ||
metricdata.Metrics{ | ||
Name: "otelcol_receiver_failed_metric_points", | ||
Description: "The number of metric points that failed to be processed by the receiver due to internal errors.", | ||
Unit: "{datapoints}", | ||
Data: metricdata.Sum[int64]{ | ||
Temporality: metricdata.CumulativeTemporality, | ||
IsMonotonic: true, | ||
DataPoints: []metricdata.DataPoint[int64]{ | ||
{ | ||
Attributes: attribute.NewSet( | ||
attribute.String("receiver", id.String()), | ||
attribute.String("transport", transport)), | ||
Value: failed, | ||
}, | ||
}, | ||
}, | ||
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) | ||
|
||
got, err = tt.GetMetric("otelcol_receiver_accepted_metric_points") | ||
require.NoError(t, err) | ||
metricdatatest.AssertEqual(t, | ||
metricdata.Metrics{ | ||
Name: "otelcol_receiver_accepted_metric_points", | ||
Description: "Number of metric points successfully pushed into the pipeline. [alpha]", | ||
Unit: "{datapoints}", | ||
Data: metricdata.Sum[int64]{ | ||
Temporality: metricdata.CumulativeTemporality, | ||
IsMonotonic: true, | ||
DataPoints: []metricdata.DataPoint[int64]{ | ||
{ | ||
Attributes: attribute.NewSet( | ||
attribute.String("receiver", id.String()), | ||
attribute.String("transport", transport)), | ||
Value: accepted, | ||
}, | ||
}, | ||
}, | ||
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) | ||
|
||
got, err = tt.GetMetric("otelcol_receiver_refused_metric_points") | ||
require.NoError(t, err) | ||
metricdatatest.AssertEqual(t, | ||
metricdata.Metrics{ | ||
Name: "otelcol_receiver_refused_metric_points", | ||
Description: "Number of metric points that could not be pushed into the pipeline. [alpha]", | ||
Unit: "{datapoints}", | ||
Data: metricdata.Sum[int64]{ | ||
Temporality: metricdata.CumulativeTemporality, | ||
IsMonotonic: true, | ||
DataPoints: []metricdata.DataPoint[int64]{ | ||
{ | ||
Attributes: attribute.NewSet( | ||
attribute.String("receiver", id.String()), | ||
attribute.String("transport", transport)), | ||
Value: refused, | ||
}, | ||
}, | ||
}, | ||
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) | ||
|
||
// Assert receiver_requests metric | ||
if gateEnabled { | ||
got, err := tt.GetMetric("otelcol_receiver_requests") | ||
require.NoError(t, err) | ||
|
||
// Calculate expected requests based on accepted and refused counts | ||
var expectedRequests []metricdata.DataPoint[int64] | ||
if accepted > 0 { | ||
expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ | ||
Attributes: attribute.NewSet( | ||
attribute.String("receiver", id.String()), | ||
attribute.String("transport", transport), | ||
attribute.String("outcome", "success")), | ||
Value: accepted, | ||
}) | ||
} | ||
if rejected > 0 { | ||
expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ | ||
Attributes: attribute.NewSet( | ||
attribute.String("receiver", id.String()), | ||
attribute.String("transport", transport), | ||
attribute.String("outcome", outcome)), | ||
Value: 1, // One request failed | ||
}) | ||
} | ||
|
||
metricdatatest.AssertEqual(t, | ||
metricdata.Metrics{ | ||
Name: "otelcol_receiver_requests", | ||
Description: "The number of requests performed.", | ||
Unit: "{requests}", | ||
Data: metricdata.Sum[int64]{ | ||
Temporality: metricdata.CumulativeTemporality, | ||
IsMonotonic: true, | ||
DataPoints: expectedRequests, | ||
}, | ||
}, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) | ||
} else { | ||
_, err := tt.GetMetric("otelcol_receiver_requests") | ||
require.Error(t, err) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.