@@ -16,7 +16,10 @@ import (
1616 "github.com/stretchr/testify/require"
1717 "go.opentelemetry.io/collector/component"
1818 "go.opentelemetry.io/collector/component/componenttest"
19+ "go.opentelemetry.io/collector/consumer"
1920 "go.opentelemetry.io/collector/consumer/consumertest"
21+ "go.opentelemetry.io/collector/pdata/plog"
22+ "go.opentelemetry.io/collector/receiver/receiverhelper"
2023 "go.opentelemetry.io/collector/receiver/receivertest"
2124 "gopkg.in/yaml.v2"
2225
@@ -164,6 +167,102 @@ func TestShutdownFlush(t *testing.T) {
164167 )
165168}
166169
170+ func BenchmarkReceiver (b * testing.B ) {
171+ b .Run (
172+ "1 Log entry per iteration" ,
173+ func (b * testing.B ) {
174+ benchmarkReceiver (b , 1 )
175+ },
176+ )
177+ b .Run (
178+ "10 Log entries per iteration" ,
179+ func (b * testing.B ) {
180+ benchmarkReceiver (b , 10 )
181+ },
182+ )
183+ b .Run (
184+ "100 Log entries per iteration" ,
185+ func (b * testing.B ) {
186+ benchmarkReceiver (b , 100 )
187+ },
188+ )
189+ b .Run (
190+ "1_000 Log entries per iteration" ,
191+ func (b * testing.B ) {
192+ benchmarkReceiver (b , 1_000 )
193+ },
194+ )
195+ b .Run (
196+ "10_000 Log entries per iteration" ,
197+ func (b * testing.B ) {
198+ benchmarkReceiver (b , 10_000 )
199+ },
200+ )
201+ }
202+
203+ func benchmarkReceiver (b * testing.B , logsPerIteration int ) {
204+ iterationComplete := make (chan struct {})
205+ nextIteration := make (chan struct {})
206+
207+ inputBuilder := & testInputBuilder {
208+ numberOfLogEntries : logsPerIteration ,
209+ nextIteration : nextIteration ,
210+ }
211+ inputCfg := operator.Config {
212+ Builder : inputBuilder ,
213+ }
214+
215+ set := componenttest .NewNopTelemetrySettings ()
216+ emitter := helper .NewLogEmitter (set )
217+ defer func () {
218+ require .NoError (b , emitter .Stop ())
219+ }()
220+
221+ pipe , err := pipeline.Config {
222+ Operators : []operator.Config {inputCfg },
223+ DefaultOutput : emitter ,
224+ }.Build (set )
225+ require .NoError (b , err )
226+
227+ storageClient := storagetest .NewInMemoryClient (
228+ component .KindReceiver ,
229+ component .MustNewID ("foolog" ),
230+ "test" ,
231+ )
232+
233+ converter := NewConverter (set )
234+
235+ obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {ReceiverCreateSettings : receivertest .NewNopSettings ()})
236+ require .NoError (b , err )
237+
238+ mockConsumer := & testConsumer {
239+ receivedAllLogs : iterationComplete ,
240+ expectedLogs : uint32 (logsPerIteration ),
241+ receivedLogs : atomic.Uint32 {},
242+ }
243+ rcv := & receiver {
244+ set : set ,
245+ pipe : pipe ,
246+ emitter : emitter ,
247+ consumer : mockConsumer ,
248+ converter : converter ,
249+ obsrecv : obsrecv ,
250+ storageClient : storageClient ,
251+ }
252+
253+ b .ResetTimer ()
254+
255+ require .NoError (b , rcv .Start (context .Background (), nil ))
256+
257+ for i := 0 ; i < b .N ; i ++ {
258+ nextIteration <- struct {}{}
259+ <- iterationComplete
260+ mockConsumer .receivedLogs .Store (0 )
261+ }
262+
263+ require .NoError (b , rcv .Shutdown (context .Background ()))
264+ }
265+
167266func BenchmarkReadLine (b * testing.B ) {
168267 filePath := filepath .Join (b .TempDir (), "bench.log" )
169268
@@ -281,3 +380,94 @@ func BenchmarkParseAndMap(b *testing.B) {
281380 }
282381 }
283382}
383+
384+ const testInputOperatorTypeStr = "test_input"
385+
386+ type testInputBuilder struct {
387+ numberOfLogEntries int
388+ nextIteration chan struct {}
389+ }
390+
391+ func (t * testInputBuilder ) ID () string {
392+ return testInputOperatorTypeStr
393+ }
394+
395+ func (t * testInputBuilder ) Type () string {
396+ return testInputOperatorTypeStr
397+ }
398+
399+ func (t * testInputBuilder ) Build (settings component.TelemetrySettings ) (operator.Operator , error ) {
400+ inputConfig := helper .NewInputConfig (t .ID (), testInputOperatorTypeStr )
401+ inputOperator , err := inputConfig .Build (settings )
402+ if err != nil {
403+ return nil , err
404+ }
405+
406+ return & testInputOperator {
407+ InputOperator : inputOperator ,
408+ numberOfLogEntries : t .numberOfLogEntries ,
409+ nextIteration : t .nextIteration ,
410+ }, nil
411+ }
412+
413+ func (t * testInputBuilder ) SetID (_ string ) {}
414+
415+ var _ operator.Operator = & testInputOperator {}
416+
417+ type testInputOperator struct {
418+ helper.InputOperator
419+ numberOfLogEntries int
420+ nextIteration chan struct {}
421+ cancelFunc context.CancelFunc
422+ }
423+
424+ func (t * testInputOperator ) ID () string {
425+ return testInputOperatorTypeStr
426+ }
427+
428+ func (t * testInputOperator ) Type () string {
429+ return testInputOperatorTypeStr
430+ }
431+
432+ func (t * testInputOperator ) Start (_ operator.Persister ) error {
433+ ctx , cancelFunc := context .WithCancel (context .Background ())
434+ t .cancelFunc = cancelFunc
435+
436+ e := complexEntry ()
437+ go func () {
438+ for {
439+ select {
440+ case <- t .nextIteration :
441+ for i := 0 ; i < t .numberOfLogEntries ; i ++ {
442+ _ = t .Write (context .Background (), e )
443+ }
444+ case <- ctx .Done ():
445+ return
446+ }
447+ }
448+
449+ }()
450+ return nil
451+ }
452+
453+ func (t * testInputOperator ) Stop () error {
454+ t .cancelFunc ()
455+ return nil
456+ }
457+
458+ type testConsumer struct {
459+ receivedAllLogs chan struct {}
460+ expectedLogs uint32
461+ receivedLogs atomic.Uint32
462+ }
463+
464+ func (t * testConsumer ) Capabilities () consumer.Capabilities {
465+ return consumer.Capabilities {}
466+ }
467+
468+ func (t * testConsumer ) ConsumeLogs (_ context.Context , ld plog.Logs ) error {
469+ if t .receivedLogs .Add (uint32 (ld .LogRecordCount ())) >= t .expectedLogs {
470+ t .receivedAllLogs <- struct {}{}
471+ }
472+ return nil
473+ }
0 commit comments