Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions processor/processorhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ Number of items passed to the processor.
| ---- | ----------- | ---------- | --------- |
| {items} | Sum | Int | true |

### otelcol_processor_incoming_size

Total size of spans passed to the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |

### otelcol_processor_outgoing_items

Number of items emitted from the processor.
Expand All @@ -70,6 +78,14 @@ Number of items emitted from the processor.
| ---- | ----------- | ---------- | --------- |
| {items} | Sum | Int | true |

### otelcol_processor_outgoing_size

Total size of spans emitted from the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |

### otelcol_processor_refused_log_records

Number of log records that were rejected by the next component in the pipeline.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
Expand All @@ -35,7 +36,6 @@
logsFunc ProcessLogsFunc,
options ...Option,
) (processor.Logs, error) {
// TODO: Add observability metrics support
if logsFunc == nil {
return nil, errors.New("nil logsFunc")
}
Expand All @@ -51,10 +51,16 @@

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
marshaler := new(plog.ProtoMarshaler)
logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
var recordsIn, recordsOut, bytesIn, bytesOut int

span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
recordsIn := ld.LogRecordCount()
recordsIn = ld.LogRecordCount()
if obs.metricLevel >= configtelemetry.LevelDetailed {
bytesIn = marshaler.LogsSize(ld)

Check warning on line 62 in processor/processorhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/logs.go#L62

Added line #L62 was not covered by tests
}

ld, err = logsFunc(ctx, ld)
span.AddEvent("End processing.", eventOptions)
Expand All @@ -64,8 +70,12 @@
}
return err
}
recordsOut := ld.LogRecordCount()
recordsOut = ld.LogRecordCount()
obs.recordInOut(ctx, recordsIn, recordsOut)
if obs.metricLevel >= configtelemetry.LevelDetailed {
bytesOut = marshaler.LogsSize(ld)
obs.recordInOutSize(ctx, bytesIn, bytesOut)

Check warning on line 77 in processor/processorhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/logs.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}
return nextConsumer.ConsumeLogs(ctx, ld)
}, bs.consumerOptions...)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions processor/processorhelper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ telemetry:
value_type: int
monotonic: true

processor_incoming_size:
enabled: true
level: detailed
description: Total size of spans passed to the processor.
unit: "By"
sum:
value_type: int
monotonic: true

processor_outgoing_size:
enabled: true
level: detailed
description: Total size of spans emitted from the processor.
unit: "By"
sum:
value_type: int
monotonic: true

processor_accepted_spans:
enabled: true
description: Number of spans successfully pushed into the next component in the pipeline.
Expand Down
17 changes: 13 additions & 4 deletions processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/processor"
Expand All @@ -35,7 +36,6 @@
metricsFunc ProcessMetricsFunc,
options ...Option,
) (processor.Metrics, error) {
// TODO: Add observability metrics support
if metricsFunc == nil {
return nil, errors.New("nil metricsFunc")
}
Expand All @@ -51,11 +51,16 @@

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
marshaler := new(pmetric.ProtoMarshaler)
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
var pointsIn, pointsOut, bytesIn, bytesOut int

span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
pointsIn := md.DataPointCount()

pointsIn = md.DataPointCount()
if obs.metricLevel >= configtelemetry.LevelDetailed {
bytesIn = marshaler.MetricsSize(md)

Check warning on line 62 in processor/processorhelper/metrics.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/metrics.go#L62

Added line #L62 was not covered by tests
}
md, err = metricsFunc(ctx, md)
span.AddEvent("End processing.", eventOptions)
if err != nil {
Expand All @@ -64,8 +69,12 @@
}
return err
}
pointsOut := md.DataPointCount()
pointsOut = md.DataPointCount()
obs.recordInOut(ctx, pointsIn, pointsOut)
if obs.metricLevel >= configtelemetry.LevelDetailed {
bytesOut = marshaler.MetricsSize(md)
obs.recordInOutSize(ctx, bytesIn, bytesOut)

Check warning on line 76 in processor/processorhelper/metrics.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/metrics.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}
return nextConsumer.ConsumeMetrics(ctx, md)
}, bs.consumerOptions...)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions processor/processorhelper/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"go.opentelemetry.io/otel/metric"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/internal"
"go.opentelemetry.io/collector/processor/processorhelper/internal/metadata"
Expand All @@ -34,6 +35,7 @@
type ObsReport struct {
otelAttrs []attribute.KeyValue
telemetryBuilder *metadata.TelemetryBuilder
metricLevel configtelemetry.Level
}

// ObsReportSettings are settings for creating an ObsReport.
Expand All @@ -57,6 +59,7 @@
attribute.String(internal.ProcessorKey, cfg.ProcessorID.String()),
},
telemetryBuilder: telemetryBuilder,
metricLevel: cfg.ProcessorCreateSettings.TelemetrySettings.MetricsLevel,
}, nil
}

Expand All @@ -65,6 +68,11 @@
or.telemetryBuilder.ProcessorOutgoingItems.Add(ctx, int64(outgoing), metric.WithAttributes(or.otelAttrs...))
}

func (or *ObsReport) recordInOutSize(ctx context.Context, bytesIn, bytesOut int) {
or.telemetryBuilder.ProcessorIncomingSize.Add(ctx, int64(bytesIn), metric.WithAttributes(or.otelAttrs...))
or.telemetryBuilder.ProcessorOutgoingSize.Add(ctx, int64(bytesOut), metric.WithAttributes(or.otelAttrs...))

Check warning on line 73 in processor/processorhelper/obsreport.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/obsreport.go#L71-L73

Added lines #L71 - L73 were not covered by tests
}

func (or *ObsReport) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) {
var acceptedCount, refusedCount, droppedCount metric.Int64Counter
switch dataType {
Expand Down
16 changes: 13 additions & 3 deletions processor/processorhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
Expand All @@ -35,7 +36,6 @@
tracesFunc ProcessTracesFunc,
options ...Option,
) (processor.Traces, error) {
// TODO: Add observability Traces support
if tracesFunc == nil {
return nil, errors.New("nil tracesFunc")
}
Expand All @@ -51,10 +51,16 @@

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
marshaler := new(ptrace.ProtoMarshaler)
traceConsumer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
var spansIn, spansOut, bytesIn, bytesOut int

span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
spansIn := td.SpanCount()
spansIn = td.SpanCount()
if obs.metricLevel >= configtelemetry.LevelDetailed {
bytesIn = marshaler.TracesSize(td)

Check warning on line 62 in processor/processorhelper/traces.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/traces.go#L62

Added line #L62 was not covered by tests
}

td, err = tracesFunc(ctx, td)
span.AddEvent("End processing.", eventOptions)
Expand All @@ -64,8 +70,12 @@
}
return err
}
spansOut := td.SpanCount()
spansOut = td.SpanCount()
obs.recordInOut(ctx, spansIn, spansOut)
if obs.metricLevel >= configtelemetry.LevelDetailed {
bytesOut = marshaler.TracesSize(td)
obs.recordInOutSize(ctx, bytesIn, bytesOut)

Check warning on line 77 in processor/processorhelper/traces.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/traces.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}
return nextConsumer.ConsumeTraces(ctx, td)
}, bs.consumerOptions...)

Expand Down
Loading