@@ -6,13 +6,20 @@ package processorhelper
6
6
import (
7
7
"context"
8
8
"errors"
9
+ "strings"
9
10
"testing"
10
11
11
12
"github.com/stretchr/testify/assert"
12
13
"github.com/stretchr/testify/require"
14
+ "go.opentelemetry.io/otel/attribute"
15
+ "go.opentelemetry.io/otel/metric"
16
+ sdkmetric "go.opentelemetry.io/otel/sdk/metric"
17
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
18
+ "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
13
19
14
20
"go.opentelemetry.io/collector/component"
15
21
"go.opentelemetry.io/collector/component/componenttest"
22
+ "go.opentelemetry.io/collector/config/configtelemetry"
16
23
"go.opentelemetry.io/collector/consumer"
17
24
"go.opentelemetry.io/collector/consumer/consumertest"
18
25
"go.opentelemetry.io/collector/pdata/plog"
@@ -67,3 +74,77 @@ func newTestLProcessor(retError error) ProcessLogsFunc {
67
74
return ld , retError
68
75
}
69
76
}
77
+
78
+ func TestLogsProcessor_RecordInOut (t * testing.T ) {
79
+ // Regardless of how many logs are ingested, emit just one
80
+ mockAggregate := func (_ context.Context , _ plog.Logs ) (plog.Logs , error ) {
81
+ ld := plog .NewLogs ()
82
+ ld .ResourceLogs ().AppendEmpty ().ScopeLogs ().AppendEmpty ().LogRecords ().AppendEmpty ()
83
+ return ld , nil
84
+ }
85
+
86
+ incomingLogs := plog .NewLogs ()
87
+ incomingLogRecords := incomingLogs .ResourceLogs ().AppendEmpty ().ScopeLogs ().AppendEmpty ().LogRecords ()
88
+
89
+ // Add 3 records to the incoming
90
+ incomingLogRecords .AppendEmpty ()
91
+ incomingLogRecords .AppendEmpty ()
92
+ incomingLogRecords .AppendEmpty ()
93
+
94
+ metricReader := sdkmetric .NewManualReader ()
95
+ set := processortest .NewNopSettings ()
96
+ set .TelemetrySettings .MetricsLevel = configtelemetry .LevelBasic
97
+ set .TelemetrySettings .LeveledMeterProvider = func (level configtelemetry.Level ) metric.MeterProvider {
98
+ if level >= configtelemetry .LevelBasic {
99
+ return sdkmetric .NewMeterProvider (sdkmetric .WithReader (metricReader ))
100
+ }
101
+ return nil
102
+ }
103
+
104
+ lp , err := NewLogsProcessor (context .Background (), set , & testLogsCfg , consumertest .NewNop (), mockAggregate )
105
+ require .NoError (t , err )
106
+
107
+ assert .NoError (t , lp .Start (context .Background (), componenttest .NewNopHost ()))
108
+ assert .NoError (t , lp .ConsumeLogs (context .Background (), incomingLogs ))
109
+ assert .NoError (t , lp .Shutdown (context .Background ()))
110
+
111
+ ownMetrics := new (metricdata.ResourceMetrics )
112
+ require .NoError (t , metricReader .Collect (context .Background (), ownMetrics ))
113
+
114
+ require .Len (t , ownMetrics .ScopeMetrics , 1 )
115
+ require .Len (t , ownMetrics .ScopeMetrics [0 ].Metrics , 2 )
116
+
117
+ inMetric := ownMetrics .ScopeMetrics [0 ].Metrics [0 ]
118
+ outMetric := ownMetrics .ScopeMetrics [0 ].Metrics [1 ]
119
+ if strings .Contains (inMetric .Name , "outgoing" ) {
120
+ inMetric , outMetric = outMetric , inMetric
121
+ }
122
+
123
+ metricdatatest .AssertAggregationsEqual (t , metricdata.Sum [int64 ]{
124
+ Temporality : metricdata .CumulativeTemporality ,
125
+ IsMonotonic : true ,
126
+ DataPoints : []metricdata.DataPoint [int64 ]{
127
+ {
128
+ Attributes : attribute .NewSet (attribute.KeyValue {
129
+ Key : attribute .Key ("processor" ),
130
+ Value : attribute .StringValue (set .ID .String ()),
131
+ }),
132
+ Value : 3 ,
133
+ },
134
+ },
135
+ }, inMetric .Data , metricdatatest .IgnoreTimestamp ())
136
+
137
+ metricdatatest .AssertAggregationsEqual (t , metricdata.Sum [int64 ]{
138
+ Temporality : metricdata .CumulativeTemporality ,
139
+ IsMonotonic : true ,
140
+ DataPoints : []metricdata.DataPoint [int64 ]{
141
+ {
142
+ Attributes : attribute .NewSet (attribute.KeyValue {
143
+ Key : attribute .Key ("processor" ),
144
+ Value : attribute .StringValue (set .ID .String ()),
145
+ }),
146
+ Value : 1 ,
147
+ },
148
+ },
149
+ }, outMetric .Data , metricdatatest .IgnoreTimestamp ())
150
+ }
0 commit comments