@@ -6,20 +6,15 @@ package processorhelper
6
6
import (
7
7
"context"
8
8
"errors"
9
- "strings"
10
9
"testing"
11
10
12
11
"github.com/stretchr/testify/assert"
13
12
"github.com/stretchr/testify/require"
14
13
"go.opentelemetry.io/otel/attribute"
15
- "go.opentelemetry.io/otel/metric"
16
- sdkmetric "go.opentelemetry.io/otel/sdk/metric"
17
14
"go.opentelemetry.io/otel/sdk/metric/metricdata"
18
- "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
19
15
20
16
"go.opentelemetry.io/collector/component"
21
17
"go.opentelemetry.io/collector/component/componenttest"
22
- "go.opentelemetry.io/collector/config/configtelemetry"
23
18
"go.opentelemetry.io/collector/consumer"
24
19
"go.opentelemetry.io/collector/consumer/consumertest"
25
20
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -76,76 +71,60 @@ func newTestMProcessor(retError error) ProcessMetricsFunc {
76
71
}
77
72
78
73
func TestMetricsProcessor_RecordInOut (t * testing.T ) {
79
- // Regardless of how many data points are ingested, emit just one
74
+ // Regardless of how many data points are ingested, emit 3
80
75
mockAggregate := func (_ context.Context , _ pmetric.Metrics ) (pmetric.Metrics , error ) {
81
76
md := pmetric .NewMetrics ()
82
77
md .ResourceMetrics ().AppendEmpty ().ScopeMetrics ().AppendEmpty ().Metrics ().AppendEmpty ().SetEmptySum ().DataPoints ().AppendEmpty ()
78
+ md .ResourceMetrics ().AppendEmpty ().ScopeMetrics ().AppendEmpty ().Metrics ().AppendEmpty ().SetEmptySum ().DataPoints ().AppendEmpty ()
79
+ md .ResourceMetrics ().AppendEmpty ().ScopeMetrics ().AppendEmpty ().Metrics ().AppendEmpty ().SetEmptySum ().DataPoints ().AppendEmpty ()
83
80
return md , nil
84
81
}
85
82
86
83
incomingMetrics := pmetric .NewMetrics ()
87
84
dps := incomingMetrics .ResourceMetrics ().AppendEmpty ().ScopeMetrics ().AppendEmpty ().Metrics ().AppendEmpty ().SetEmptySum ().DataPoints ()
88
85
89
- // Add 3 data points to the incoming
90
- dps .AppendEmpty ()
86
+ // Add 2 data points to the incoming
91
87
dps .AppendEmpty ()
92
88
dps .AppendEmpty ()
93
89
94
- metricReader := sdkmetric .NewManualReader ()
95
- set := processortest .NewNopSettings ()
96
- set .TelemetrySettings .MetricsLevel = configtelemetry .LevelNormal
97
- set .TelemetrySettings .MetricsLevel = configtelemetry .LevelBasic
98
- set .TelemetrySettings .LeveledMeterProvider = func (level configtelemetry.Level ) metric.MeterProvider {
99
- if level >= configtelemetry .LevelBasic {
100
- return sdkmetric .NewMeterProvider (sdkmetric .WithReader (metricReader ))
101
- }
102
- return nil
103
- }
104
-
105
- mp , err := NewMetricsProcessor (context .Background (), set , & testMetricsCfg , consumertest .NewNop (), mockAggregate )
90
+ testTelemetry := setupTestTelemetry ()
91
+ mp , err := NewMetricsProcessor (context .Background (), testTelemetry .NewSettings (), & testMetricsCfg , consumertest .NewNop (), mockAggregate )
106
92
require .NoError (t , err )
107
93
108
94
assert .NoError (t , mp .Start (context .Background (), componenttest .NewNopHost ()))
109
95
assert .NoError (t , mp .ConsumeMetrics (context .Background (), incomingMetrics ))
110
96
assert .NoError (t , mp .Shutdown (context .Background ()))
111
97
112
- ownMetrics := new (metricdata.ResourceMetrics )
113
- require .NoError (t , metricReader .Collect (context .Background (), ownMetrics ))
114
-
115
- require .Len (t , ownMetrics .ScopeMetrics , 1 )
116
- require .Len (t , ownMetrics .ScopeMetrics [0 ].Metrics , 2 )
117
-
118
- inMetric := ownMetrics .ScopeMetrics [0 ].Metrics [0 ]
119
- outMetric := ownMetrics .ScopeMetrics [0 ].Metrics [1 ]
120
- if strings .Contains (inMetric .Name , "outgoing" ) {
121
- inMetric , outMetric = outMetric , inMetric
122
- }
123
-
124
- metricdatatest .AssertAggregationsEqual (t , metricdata.Sum [int64 ]{
125
- Temporality : metricdata .CumulativeTemporality ,
126
- IsMonotonic : true ,
127
- DataPoints : []metricdata.DataPoint [int64 ]{
128
- {
129
- Attributes : attribute .NewSet (attribute.KeyValue {
130
- Key : attribute .Key ("processor" ),
131
- Value : attribute .StringValue (set .ID .String ()),
132
- }),
133
- Value : 3 ,
98
+ testTelemetry .assertMetrics (t , []metricdata.Metrics {
99
+ {
100
+ Name : "otelcol_processor_incoming_metric_points" ,
101
+ Description : "Number of metric points passed to the processor." ,
102
+ Unit : "{datapoints}" ,
103
+ Data : metricdata.Sum [int64 ]{
104
+ Temporality : metricdata .CumulativeTemporality ,
105
+ IsMonotonic : true ,
106
+ DataPoints : []metricdata.DataPoint [int64 ]{
107
+ {
108
+ Value : 2 ,
109
+ Attributes : attribute .NewSet (attribute .String ("processor" , "processorhelper" )),
110
+ },
111
+ },
134
112
},
135
113
},
136
- }, inMetric .Data , metricdatatest .IgnoreTimestamp ())
137
-
138
- metricdatatest .AssertAggregationsEqual (t , metricdata.Sum [int64 ]{
139
- Temporality : metricdata .CumulativeTemporality ,
140
- IsMonotonic : true ,
141
- DataPoints : []metricdata.DataPoint [int64 ]{
142
- {
143
- Attributes : attribute .NewSet (attribute.KeyValue {
144
- Key : attribute .Key ("processor" ),
145
- Value : attribute .StringValue (set .ID .String ()),
146
- }),
147
- Value : 1 ,
114
+ {
115
+ Name : "otelcol_processor_outgoing_metric_points" ,
116
+ Description : "Number of metric points emitted from the processor." ,
117
+ Unit : "{datapoints}" ,
118
+ Data : metricdata.Sum [int64 ]{
119
+ Temporality : metricdata .CumulativeTemporality ,
120
+ IsMonotonic : true ,
121
+ DataPoints : []metricdata.DataPoint [int64 ]{
122
+ {
123
+ Value : 3 ,
124
+ Attributes : attribute .NewSet (attribute .String ("processor" , "processorhelper" )),
125
+ },
126
+ },
148
127
},
149
128
},
150
- }, outMetric . Data , metricdatatest . IgnoreTimestamp () )
129
+ })
151
130
}
0 commit comments