Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/service-profiles.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support profiles in the service package

# One or more tracking issues or pull requests related to the change
issues: [11024]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
2 changes: 1 addition & 1 deletion service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
go.opentelemetry.io/collector/featuregate v1.14.1
go.opentelemetry.io/collector/internal/globalgates v0.108.1
go.opentelemetry.io/collector/pdata v1.14.1
go.opentelemetry.io/collector/pdata/pprofile v0.108.1
go.opentelemetry.io/collector/pdata/testdata v0.108.1
go.opentelemetry.io/collector/processor v0.108.1
go.opentelemetry.io/collector/processor/processorprofiles v0.108.1
Expand Down Expand Up @@ -93,7 +94,6 @@ require (
go.opentelemetry.io/collector/config/configtls v1.14.1 // indirect
go.opentelemetry.io/collector/config/internal v0.108.1 // indirect
go.opentelemetry.io/collector/extension/auth v0.108.1 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.108.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
go.opentelemetry.io/contrib/zpages v0.54.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.5.0 // indirect
Expand Down
19 changes: 0 additions & 19 deletions service/internal/builders/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumerprofiles"
)

// Connector is an interface that allows using implementations of the builder
// from different packages.
type Connector interface {
CreateTracesToTraces(context.Context, connector.Settings, consumer.Traces) (connector.Traces, error)
CreateTracesToMetrics(context.Context, connector.Settings, consumer.Metrics) (connector.Traces, error)
CreateTracesToLogs(context.Context, connector.Settings, consumer.Logs) (connector.Traces, error)

CreateMetricsToTraces(context.Context, connector.Settings, consumer.Traces) (connector.Metrics, error)
CreateMetricsToMetrics(context.Context, connector.Settings, consumer.Metrics) (connector.Metrics, error)
CreateMetricsToLogs(context.Context, connector.Settings, consumer.Logs) (connector.Metrics, error)

CreateLogsToTraces(context.Context, connector.Settings, consumer.Traces) (connector.Logs, error)
CreateLogsToMetrics(context.Context, connector.Settings, consumer.Metrics) (connector.Logs, error)
CreateLogsToLogs(context.Context, connector.Settings, consumer.Logs) (connector.Logs, error)

IsConfigured(component.ID) bool
Factory(component.Type) component.Factory
}

// ConnectorBuilder is a helper struct that given a set of Configs and Factories helps with creating connectors.
type ConnectorBuilder struct {
cfgs map[component.ID]component.Config
Expand Down
9 changes: 0 additions & 9 deletions service/internal/builders/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,6 @@ import (
"go.opentelemetry.io/collector/exporter/exportertest"
)

// Exporter is an interface that allows using implementations of the builder
// from different packages.
type Exporter interface {
CreateTraces(context.Context, exporter.Settings) (exporter.Traces, error)
CreateMetrics(context.Context, exporter.Settings) (exporter.Metrics, error)
CreateLogs(context.Context, exporter.Settings) (exporter.Logs, error)
Factory(component.Type) component.Factory
}

// ExporterBuilder is a helper struct that given a set of Configs and Factories helps with creating exporters.
type ExporterBuilder struct {
cfgs map[component.ID]component.Config
Expand Down
9 changes: 0 additions & 9 deletions service/internal/builders/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@ import (
"go.opentelemetry.io/collector/processor/processortest"
)

// Processor is an interface that allows using implementations of the builder
// from different packages.
type Processor interface {
CreateTraces(context.Context, processor.Settings, consumer.Traces) (processor.Traces, error)
CreateMetrics(context.Context, processor.Settings, consumer.Metrics) (processor.Metrics, error)
CreateLogs(context.Context, processor.Settings, consumer.Logs) (processor.Logs, error)
Factory(component.Type) component.Factory
}

// ProcessorBuilder processor is a helper struct that given a set of Configs
// and Factories helps with creating processors.
type ProcessorBuilder struct {
Expand Down
9 changes: 0 additions & 9 deletions service/internal/builders/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@ import (
"go.opentelemetry.io/collector/receiver/receivertest"
)

// Receiver is an interface that allows using implementations of the builder
// from different packages.
type Receiver interface {
CreateTraces(context.Context, receiver.Settings, consumer.Traces) (receiver.Traces, error)
CreateMetrics(context.Context, receiver.Settings, consumer.Metrics) (receiver.Metrics, error)
CreateLogs(context.Context, receiver.Settings, consumer.Logs) (receiver.Logs, error)
Factory(component.Type) component.Factory
}

// ReceiverBuilder receiver is a helper struct that given a set of Configs and
// Factories helps with creating receivers.
type ReceiverBuilder struct {
Expand Down
17 changes: 17 additions & 0 deletions service/internal/capabilityconsumer/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package capabilityconsumer // import "go.opentelemetry.io/collector/service/inte

import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
)

func NewLogs(logs consumer.Logs, cap consumer.Capabilities) consumer.Logs {
Expand Down Expand Up @@ -54,3 +55,19 @@ type capTraces struct {
func (mts capTraces) Capabilities() consumer.Capabilities {
return mts.cap
}

func NewProfiles(profiles consumerprofiles.Profiles, cap consumer.Capabilities) consumerprofiles.Profiles {
if profiles.Capabilities() == cap {
return profiles
}
return capProfiles{Profiles: profiles, cap: cap}
}

type capProfiles struct {
consumerprofiles.Profiles
cap consumer.Capabilities
}

func (mts capProfiles) Capabilities() consumer.Capabilities {
return mts.cap
}
15 changes: 15 additions & 0 deletions service/internal/capabilityconsumer/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,18 @@ func TestTraces(t *testing.T) {
assert.Len(t, sink.AllTraces(), 1)
assert.Equal(t, testdata.GenerateTraces(1), sink.AllTraces()[0])
}

func TestProfiles(t *testing.T) {
sink := &consumertest.ProfilesSink{}
require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities())

same := NewProfiles(sink, consumer.Capabilities{MutatesData: false})
assert.Same(t, sink, same)

wrap := NewProfiles(sink, consumer.Capabilities{MutatesData: true})
assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities())

assert.NoError(t, wrap.ConsumeProfiles(context.Background(), testdata.GenerateProfiles(1)))
assert.Len(t, sink.AllProfiles(), 1)
assert.Equal(t, testdata.GenerateProfiles(1), sink.AllProfiles()[0])
}
38 changes: 34 additions & 4 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"gonum.org/v1/gonum/graph/topo"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentprofiles"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
Expand All @@ -40,10 +42,10 @@ type Settings struct {
Telemetry component.TelemetrySettings
BuildInfo component.BuildInfo

ReceiverBuilder builders.Receiver
ProcessorBuilder builders.Processor
ExporterBuilder builders.Exporter
ConnectorBuilder builders.Connector
ReceiverBuilder *builders.ReceiverBuilder
ProcessorBuilder *builders.ProcessorBuilder
ExporterBuilder *builders.ExporterBuilder
ConnectorBuilder *builders.ConnectorBuilder

// PipelineConfigs is a map of component.ID to PipelineConfig.
PipelineConfigs pipelines.Config
Expand Down Expand Up @@ -314,6 +316,10 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
cc := capabilityconsumer.NewLogs(next.(consumer.Logs), capability)
n.baseConsumer = cc
n.ConsumeLogsFunc = cc.ConsumeLogs
case componentprofiles.DataTypeProfiles:
cc := capabilityconsumer.NewProfiles(next.(consumerprofiles.Profiles), capability)
n.baseConsumer = cc
n.ConsumeProfilesFunc = cc.ConsumeProfiles
}
case *fanOutNode:
nexts := g.nextConsumers(n.ID())
Expand All @@ -336,6 +342,12 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
consumers = append(consumers, next.(consumer.Logs))
}
n.baseConsumer = fanoutconsumer.NewLogs(consumers)
case componentprofiles.DataTypeProfiles:
consumers := make([]consumerprofiles.Profiles, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumerprofiles.Profiles))
}
n.baseConsumer = fanoutconsumer.NewProfiles(consumers)
}
}
if err != nil {
Expand Down Expand Up @@ -476,6 +488,7 @@ func (g *Graph) GetExporters() map[component.DataType]map[component.ID]component
exportersMap[component.DataTypeTraces] = make(map[component.ID]component.Component)
exportersMap[component.DataTypeMetrics] = make(map[component.ID]component.Component)
exportersMap[component.DataTypeLogs] = make(map[component.ID]component.Component)
exportersMap[componentprofiles.DataTypeProfiles] = make(map[component.ID]component.Component)

for _, pg := range g.pipelines {
for _, expNode := range pg.exporters {
Expand Down Expand Up @@ -538,6 +551,8 @@ func connectorStability(f connector.Factory, expType, recType component.Type) co
return f.TracesToMetricsStability()
case component.DataTypeLogs:
return f.TracesToLogsStability()
case componentprofiles.DataTypeProfiles:
return f.TracesToProfilesStability()
}
case component.DataTypeMetrics:
switch recType {
Expand All @@ -547,6 +562,8 @@ func connectorStability(f connector.Factory, expType, recType component.Type) co
return f.MetricsToMetricsStability()
case component.DataTypeLogs:
return f.MetricsToLogsStability()
case componentprofiles.DataTypeProfiles:
return f.MetricsToProfilesStability()
}
case component.DataTypeLogs:
switch recType {
Expand All @@ -556,6 +573,19 @@ func connectorStability(f connector.Factory, expType, recType component.Type) co
return f.LogsToMetricsStability()
case component.DataTypeLogs:
return f.LogsToLogsStability()
case componentprofiles.DataTypeProfiles:
return f.LogsToProfilesStability()
}
case componentprofiles.DataTypeProfiles:
switch recType {
case component.DataTypeTraces:
return f.ProfilesToTracesStability()
case component.DataTypeMetrics:
return f.ProfilesToMetricsStability()
case component.DataTypeLogs:
return f.ProfilesToLogsStability()
case componentprofiles.DataTypeProfiles:
return f.ProfilesToProfilesStability()
}
}
return component.StabilityLevelUndefined
Expand Down
Loading
Loading