Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0dcde12
[processor/transform] Create `additional*Funcs` parameters for NewPro…
franciscovalentecastro Apr 17, 2025
b12567c
[processor/transform] Create `transformProcessorFactory` and `WithAdd…
franciscovalentecastro May 8, 2025
1c09ed9
[processor/transform] Create `Validate*Statements` methods to be used…
franciscovalentecastro May 9, 2025
6219553
[processor/transform] Update README.md.
franciscovalentecastro May 9, 2025
0a66d4e
[processor/transform] Create `ValidateWithAdditionalFunctions` and f…
franciscovalentecastro May 12, 2025
95b4445
[processor/transform] Create `OttlProcessorFactory` and use unexporte…
franciscovalentecastro May 12, 2025
d3cf66a
[processor/transform] Add more `functions` and `processor` tests.
franciscovalentecastro May 21, 2025
f1060d6
[processor/transform] Remove `README.md` update.
franciscovalentecastro May 21, 2025
71f20ff
[processor/transform] Remove `pkg/ottl/ottlprocessor` package and mov…
franciscovalentecastro May 21, 2025
928adb3
[processor/transform] Refactor `WithAdditional*Functions` to get logs…
franciscovalentecastro May 22, 2025
10a92f2
[processor/transform] Revert changes on `internal/functions.go`.
franciscovalentecastro May 22, 2025
b66edc7
[processor/transform] Create `With*Functions` factory options instead…
franciscovalentecastro May 23, 2025
9bc00fb
[processor/test] Use variadic arguments in With*Functions.
franciscovalentecastro May 23, 2025
cea8fb6
[processor/transform] Add CHANGELOG entry.
franciscovalentecastro May 26, 2025
706fb13
[processor/transform] Fix lint error.
franciscovalentecastro May 28, 2025
5b368de
[processor/transform] Create `NewFactoryWithOptions` to avoid failure…
franciscovalentecastro May 28, 2025
29b456d
[processor/transform] Add `assertConfigContainsDefaultFunctions` to a…
franciscovalentecastro Jun 4, 2025
30fce1b
[processor/transform] Update non-default functions log and remove `Su…
franciscovalentecastro Jun 4, 2025
c8101cf
[processor/transform] Update `With*Functions` to make possible to use…
franciscovalentecastro Jun 4, 2025
a2eeed7
[processor/transform] Fix unnecessary update.
franciscovalentecastro Jun 10, 2025
a755aa1
[processor/transform] Add `api` to change_logs.
franciscovalentecastro Jun 10, 2025
6e101a0
[processor/transform] Update With*Functions comments, non-default fun…
franciscovalentecastro Jun 10, 2025
664778a
[process/transform] Update test to add cases without default functions.
franciscovalentecastro Jun 11, 2025
d4af5a1
[processor/transform] Update to uppercase OTTL.
franciscovalentecastro Jun 11, 2025
7d3c29c
[processor/transform] Add log with fields to know which context regis…
franciscovalentecastro Jun 11, 2025
825d3e4
[process/transform] Nit, remove `s` from traces.
franciscovalentecastro Jun 11, 2025
5e2a5ad
Merge branch 'main' into fcovalente-with-add-ottl-funcs
evan-bradley Jun 16, 2025
9836ba1
Merge branch 'main' into fcovalente-with-add-ottl-funcs
evan-bradley Jun 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/fcovalente-with-add-ottl-funcs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Create `With*Functions` factory options to provide custom OTTL functions for logs, metrics or traces to the resulting transform processor.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [39698]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
20 changes: 14 additions & 6 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

var (
Expand All @@ -45,6 +47,12 @@ type Config struct {

FlattenData bool `mapstructure:"flatten_data"`
logger *zap.Logger

dataPointFunctions map[string]ottl.Factory[ottldatapoint.TransformContext]
logFunctions map[string]ottl.Factory[ottllog.TransformContext]
metricFunctions map[string]ottl.Factory[ottlmetric.TransformContext]
spanEventFunctions map[string]ottl.Factory[ottlspanevent.TransformContext]
spanFunctions map[string]ottl.Factory[ottlspan.TransformContext]
}

// Unmarshal is used internally by mapstructure to parse the transformprocessor configuration (Config),
Expand Down Expand Up @@ -133,7 +141,7 @@ func (c *Config) Validate() error {
var errors error

if len(c.TraceStatements) > 0 {
pc, err := common.NewTraceParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithSpanParser(traces.SpanFunctions()), common.WithSpanEventParser(traces.SpanEventFunctions()))
pc, err := common.NewTraceParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithSpanParser(c.spanFunctions), common.WithSpanEventParser(c.spanEventFunctions))
if err != nil {
return err
}
Expand All @@ -146,7 +154,7 @@ func (c *Config) Validate() error {
}

if len(c.MetricStatements) > 0 {
pc, err := common.NewMetricParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithMetricParser(metrics.MetricFunctions()), common.WithDataPointParser(metrics.DataPointFunctions()))
pc, err := common.NewMetricParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithMetricParser(c.metricFunctions), common.WithDataPointParser(c.dataPointFunctions))
if err != nil {
return err
}
Expand All @@ -159,7 +167,7 @@ func (c *Config) Validate() error {
}

if len(c.LogStatements) > 0 {
pc, err := common.NewLogParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithLogParser(logs.LogFunctions()))
pc, err := common.NewLogParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithLogParser(c.logFunctions))
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ func TestLoadConfig(t *testing.T) {
}
} else {
assert.NoError(t, xconfmap.Validate(cfg))
assert.Equal(t, tt.expected, cfg)
assert.EqualExportedValues(t, tt.expected, cfg)
assertConfigContainsDefaultFunctions(t, *cfg.(*Config))
}
})
}
Expand Down
151 changes: 133 additions & 18 deletions processor/transformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metadata"
Expand All @@ -22,34 +28,133 @@ import (

var processorCapabilities = consumer.Capabilities{MutatesData: true}

type transformProcessorFactory struct {
dataPointFunctions map[string]ottl.Factory[ottldatapoint.TransformContext]
logFunctions map[string]ottl.Factory[ottllog.TransformContext]
metricFunctions map[string]ottl.Factory[ottlmetric.TransformContext]
spanEventFunctions map[string]ottl.Factory[ottlspanevent.TransformContext]
spanFunctions map[string]ottl.Factory[ottlspan.TransformContext]
defaultDataPointFunctionsOverridden bool
defaultLogFunctionsOverridden bool
defaultMetricFunctionsOverridden bool
defaultSpanEventFunctionsOverridden bool
defaultSpanFunctionsOverridden bool
}

// FactoryOption applies changes to transformProcessorFactory.
type FactoryOption func(factory *transformProcessorFactory)

// WithDataPointFunctions will override the default OTTL datapoint context functions with the provided dataPointFunctions in resulting processor.
// Subsequent uses of WithDataPointFunctions will merge the provided dataPointFunctions with the previously registered functions.
func WithDataPointFunctions(dataPointFunctions []ottl.Factory[ottldatapoint.TransformContext]) FactoryOption {
return func(factory *transformProcessorFactory) {
if !factory.defaultDataPointFunctionsOverridden {
factory.dataPointFunctions = map[string]ottl.Factory[ottldatapoint.TransformContext]{}
factory.defaultDataPointFunctionsOverridden = true
}
factory.dataPointFunctions = mergeFunctionsToMap(factory.dataPointFunctions, dataPointFunctions)
}
}

// WithLogFunctions will override the default OTTL log context functions with the provided logFunctions in the resulting processor.
// Subsequent uses of WithLogFunctions will merge the provided logFunctions with the previously registered functions.
func WithLogFunctions(logFunctions []ottl.Factory[ottllog.TransformContext]) FactoryOption {
return func(factory *transformProcessorFactory) {
if !factory.defaultLogFunctionsOverridden {
factory.logFunctions = map[string]ottl.Factory[ottllog.TransformContext]{}
factory.defaultLogFunctionsOverridden = true
}
factory.logFunctions = mergeFunctionsToMap(factory.logFunctions, logFunctions)
}
}

// WithMetricFunctions will override the default OTTL metric context functions with the provided metricFunctions in the resulting processor.
// Subsequent uses of WithMetricFunctions will merge the provided metricFunctions with the previously registered functions.
func WithMetricFunctions(metricFunctions []ottl.Factory[ottlmetric.TransformContext]) FactoryOption {
return func(factory *transformProcessorFactory) {
if !factory.defaultMetricFunctionsOverridden {
factory.metricFunctions = map[string]ottl.Factory[ottlmetric.TransformContext]{}
factory.defaultMetricFunctionsOverridden = true
}
factory.metricFunctions = mergeFunctionsToMap(factory.metricFunctions, metricFunctions)
}
}

// WithSpanEventFunctions will override the default OTTL spanevent context functions with the provided spanEventFunctions in the resulting processor.
// Subsequent uses of WithSpanEventFunctions will merge the provided spanEventFunctions with the previously registered functions.
func WithSpanEventFunctions(spanEventFunctions []ottl.Factory[ottlspanevent.TransformContext]) FactoryOption {
return func(factory *transformProcessorFactory) {
if !factory.defaultSpanEventFunctionsOverridden {
factory.spanEventFunctions = map[string]ottl.Factory[ottlspanevent.TransformContext]{}
factory.defaultSpanEventFunctionsOverridden = true
}
factory.spanEventFunctions = mergeFunctionsToMap(factory.spanEventFunctions, spanEventFunctions)
}
}

// WithSpanFunctions will override the default OTTL span context functions with the provided spanFunctions in the resulting processor.
// Subsequent uses of WithSpanFunctions will merge the provided spanFunctions with the previously registered functions.
func WithSpanFunctions(spanFunctions []ottl.Factory[ottlspan.TransformContext]) FactoryOption {
return func(factory *transformProcessorFactory) {
if !factory.defaultSpanFunctionsOverridden {
factory.spanFunctions = map[string]ottl.Factory[ottlspan.TransformContext]{}
factory.defaultSpanFunctionsOverridden = true
}
factory.spanFunctions = mergeFunctionsToMap(factory.spanFunctions, spanFunctions)
}
}

func NewFactory() processor.Factory {
return NewFactoryWithOptions()
}

// NewFactoryWithOptions can receive FactoryOption like With*Functions to register non-default OTTL functions in the resulting processor.
func NewFactoryWithOptions(options ...FactoryOption) processor.Factory {
f := &transformProcessorFactory{
dataPointFunctions: defaultDataPointFunctionsMap(),
logFunctions: defaultLogFunctionsMap(),
metricFunctions: defaultMetricFunctionsMap(),
spanEventFunctions: defaultSpanEventFunctionsMap(),
spanFunctions: defaultSpanFunctionsMap(),
}
for _, o := range options {
o(f)
}

return processor.NewFactory(
metadata.Type,
createDefaultConfig,
processor.WithLogs(createLogsProcessor, metadata.LogsStability),
processor.WithTraces(createTracesProcessor, metadata.TracesStability),
processor.WithMetrics(createMetricsProcessor, metadata.MetricsStability),
f.createDefaultConfig,
processor.WithLogs(f.createLogsProcessor, metadata.LogsStability),
processor.WithTraces(f.createTracesProcessor, metadata.TracesStability),
processor.WithMetrics(f.createMetricsProcessor, metadata.MetricsStability),
)
}

func createDefaultConfig() component.Config {
func (f *transformProcessorFactory) createDefaultConfig() component.Config {
return &Config{
ErrorMode: ottl.PropagateError,
TraceStatements: []common.ContextStatements{},
MetricStatements: []common.ContextStatements{},
LogStatements: []common.ContextStatements{},
ErrorMode: ottl.PropagateError,
TraceStatements: []common.ContextStatements{},
MetricStatements: []common.ContextStatements{},
LogStatements: []common.ContextStatements{},
dataPointFunctions: f.dataPointFunctions,
logFunctions: f.logFunctions,
metricFunctions: f.metricFunctions,
spanEventFunctions: f.spanEventFunctions,
spanFunctions: f.spanFunctions,
}
}

func createLogsProcessor(
func (f *transformProcessorFactory) createLogsProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
oCfg := cfg.(*Config)

proc, err := logs.NewProcessor(oCfg.LogStatements, oCfg.ErrorMode, oCfg.FlattenData, set.TelemetrySettings)
if f.defaultLogFunctionsOverridden {
set.Logger.Debug("non-default OTTL log functions have been registered in the \"transform\" processor", zap.Bool("log", f.defaultLogFunctionsOverridden))
}
proc, err := logs.NewProcessor(oCfg.LogStatements, oCfg.ErrorMode, oCfg.FlattenData, set.TelemetrySettings, f.logFunctions)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
Expand All @@ -62,15 +167,20 @@ func createLogsProcessor(
processorhelper.WithCapabilities(processorCapabilities))
}

func createTracesProcessor(
func (f *transformProcessorFactory) createTracesProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Traces,
) (processor.Traces, error) {
oCfg := cfg.(*Config)

proc, err := traces.NewProcessor(oCfg.TraceStatements, oCfg.ErrorMode, set.TelemetrySettings)
if f.defaultSpanEventFunctionsOverridden || f.defaultSpanFunctionsOverridden {
set.Logger.Debug("non-default OTTL trace functions have been registered in the \"transform\" processor",
zap.Bool("span", f.defaultSpanFunctionsOverridden),
zap.Bool("spanevent", f.defaultSpanEventFunctionsOverridden),
)
}
proc, err := traces.NewProcessor(oCfg.TraceStatements, oCfg.ErrorMode, set.TelemetrySettings, f.spanFunctions, f.spanEventFunctions)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
Expand All @@ -83,16 +193,21 @@ func createTracesProcessor(
processorhelper.WithCapabilities(processorCapabilities))
}

func createMetricsProcessor(
func (f *transformProcessorFactory) createMetricsProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Metrics,
) (processor.Metrics, error) {
oCfg := cfg.(*Config)
oCfg.logger = set.Logger

proc, err := metrics.NewProcessor(oCfg.MetricStatements, oCfg.ErrorMode, set.TelemetrySettings)
if f.defaultDataPointFunctionsOverridden || f.defaultMetricFunctionsOverridden {
set.Logger.Debug("non-default OTTL metric functions have been registered in the \"transform\" processor",
zap.Bool("datapoint", f.defaultDataPointFunctionsOverridden),
zap.Bool("metric", f.defaultMetricFunctionsOverridden),
)
}
proc, err := metrics.NewProcessor(oCfg.MetricStatements, oCfg.ErrorMode, set.TelemetrySettings, f.metricFunctions, f.dataPointFunctions)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
Expand Down
Loading