Skip to content

Commit 720f3a8

Browse files
authored
Add profiles support in service (#11024)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This is the last PR to add profiles support, adding it to the service package. This is based after #11023.
1 parent 708fc1a commit 720f3a8

21 files changed

+957
-74
lines changed

.chloggen/service-profiles.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: service
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Support profiles in the service package
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [11024]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

service/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ require (
2929
go.opentelemetry.io/collector/featuregate v1.14.1
3030
go.opentelemetry.io/collector/internal/globalgates v0.108.1
3131
go.opentelemetry.io/collector/pdata v1.14.1
32+
go.opentelemetry.io/collector/pdata/pprofile v0.108.1
3233
go.opentelemetry.io/collector/pdata/testdata v0.108.1
3334
go.opentelemetry.io/collector/processor v0.108.1
3435
go.opentelemetry.io/collector/processor/processorprofiles v0.108.1
@@ -93,7 +94,6 @@ require (
9394
go.opentelemetry.io/collector/config/configtls v1.14.1 // indirect
9495
go.opentelemetry.io/collector/config/internal v0.108.1 // indirect
9596
go.opentelemetry.io/collector/extension/auth v0.108.1 // indirect
96-
go.opentelemetry.io/collector/pdata/pprofile v0.108.1 // indirect
9797
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
9898
go.opentelemetry.io/contrib/zpages v0.54.0 // indirect
9999
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.5.0 // indirect

service/internal/builders/connector.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,6 @@ import (
1515
"go.opentelemetry.io/collector/consumer/consumerprofiles"
1616
)
1717

18-
// Connector is an interface that allows using implementations of the builder
19-
// from different packages.
20-
type Connector interface {
21-
CreateTracesToTraces(context.Context, connector.Settings, consumer.Traces) (connector.Traces, error)
22-
CreateTracesToMetrics(context.Context, connector.Settings, consumer.Metrics) (connector.Traces, error)
23-
CreateTracesToLogs(context.Context, connector.Settings, consumer.Logs) (connector.Traces, error)
24-
25-
CreateMetricsToTraces(context.Context, connector.Settings, consumer.Traces) (connector.Metrics, error)
26-
CreateMetricsToMetrics(context.Context, connector.Settings, consumer.Metrics) (connector.Metrics, error)
27-
CreateMetricsToLogs(context.Context, connector.Settings, consumer.Logs) (connector.Metrics, error)
28-
29-
CreateLogsToTraces(context.Context, connector.Settings, consumer.Traces) (connector.Logs, error)
30-
CreateLogsToMetrics(context.Context, connector.Settings, consumer.Metrics) (connector.Logs, error)
31-
CreateLogsToLogs(context.Context, connector.Settings, consumer.Logs) (connector.Logs, error)
32-
33-
IsConfigured(component.ID) bool
34-
Factory(component.Type) component.Factory
35-
}
36-
3718
// ConnectorBuilder is a helper struct that given a set of Configs and Factories helps with creating connectors.
3819
type ConnectorBuilder struct {
3920
cfgs map[component.ID]component.Config

service/internal/builders/exporter.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,6 @@ import (
1313
"go.opentelemetry.io/collector/exporter/exportertest"
1414
)
1515

16-
// Exporter is an interface that allows using implementations of the builder
17-
// from different packages.
18-
type Exporter interface {
19-
CreateTraces(context.Context, exporter.Settings) (exporter.Traces, error)
20-
CreateMetrics(context.Context, exporter.Settings) (exporter.Metrics, error)
21-
CreateLogs(context.Context, exporter.Settings) (exporter.Logs, error)
22-
Factory(component.Type) component.Factory
23-
}
24-
2516
// ExporterBuilder is a helper struct that given a set of Configs and Factories helps with creating exporters.
2617
type ExporterBuilder struct {
2718
cfgs map[component.ID]component.Config

service/internal/builders/processor.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,6 @@ import (
1515
"go.opentelemetry.io/collector/processor/processortest"
1616
)
1717

18-
// Processor is an interface that allows using implementations of the builder
19-
// from different packages.
20-
type Processor interface {
21-
CreateTraces(context.Context, processor.Settings, consumer.Traces) (processor.Traces, error)
22-
CreateMetrics(context.Context, processor.Settings, consumer.Metrics) (processor.Metrics, error)
23-
CreateLogs(context.Context, processor.Settings, consumer.Logs) (processor.Logs, error)
24-
Factory(component.Type) component.Factory
25-
}
26-
2718
// ProcessorBuilder processor is a helper struct that given a set of Configs
2819
// and Factories helps with creating processors.
2920
type ProcessorBuilder struct {

service/internal/builders/receiver.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,6 @@ import (
1515
"go.opentelemetry.io/collector/receiver/receivertest"
1616
)
1717

18-
// Receiver is an interface that allows using implementations of the builder
19-
// from different packages.
20-
type Receiver interface {
21-
CreateTraces(context.Context, receiver.Settings, consumer.Traces) (receiver.Traces, error)
22-
CreateMetrics(context.Context, receiver.Settings, consumer.Metrics) (receiver.Metrics, error)
23-
CreateLogs(context.Context, receiver.Settings, consumer.Logs) (receiver.Logs, error)
24-
Factory(component.Type) component.Factory
25-
}
26-
2718
// ReceiverBuilder receiver is a helper struct that given a set of Configs and
2819
// Factories helps with creating receivers.
2920
type ReceiverBuilder struct {

service/internal/capabilityconsumer/capabilities.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package capabilityconsumer // import "go.opentelemetry.io/collector/service/inte
55

66
import (
77
"go.opentelemetry.io/collector/consumer"
8+
"go.opentelemetry.io/collector/consumer/consumerprofiles"
89
)
910

1011
func NewLogs(logs consumer.Logs, cap consumer.Capabilities) consumer.Logs {
@@ -54,3 +55,19 @@ type capTraces struct {
5455
func (mts capTraces) Capabilities() consumer.Capabilities {
5556
return mts.cap
5657
}
58+
59+
func NewProfiles(profiles consumerprofiles.Profiles, cap consumer.Capabilities) consumerprofiles.Profiles {
60+
if profiles.Capabilities() == cap {
61+
return profiles
62+
}
63+
return capProfiles{Profiles: profiles, cap: cap}
64+
}
65+
66+
type capProfiles struct {
67+
consumerprofiles.Profiles
68+
cap consumer.Capabilities
69+
}
70+
71+
func (mts capProfiles) Capabilities() consumer.Capabilities {
72+
return mts.cap
73+
}

service/internal/capabilityconsumer/capabilities_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,18 @@ func TestTraces(t *testing.T) {
5959
assert.Len(t, sink.AllTraces(), 1)
6060
assert.Equal(t, testdata.GenerateTraces(1), sink.AllTraces()[0])
6161
}
62+
63+
func TestProfiles(t *testing.T) {
64+
sink := &consumertest.ProfilesSink{}
65+
require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities())
66+
67+
same := NewProfiles(sink, consumer.Capabilities{MutatesData: false})
68+
assert.Same(t, sink, same)
69+
70+
wrap := NewProfiles(sink, consumer.Capabilities{MutatesData: true})
71+
assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities())
72+
73+
assert.NoError(t, wrap.ConsumeProfiles(context.Background(), testdata.GenerateProfiles(1)))
74+
assert.Len(t, sink.AllProfiles(), 1)
75+
assert.Equal(t, testdata.GenerateProfiles(1), sink.AllProfiles()[0])
76+
}

service/internal/graph/graph.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ import (
2525
"gonum.org/v1/gonum/graph/topo"
2626

2727
"go.opentelemetry.io/collector/component"
28+
"go.opentelemetry.io/collector/component/componentprofiles"
2829
"go.opentelemetry.io/collector/component/componentstatus"
2930
"go.opentelemetry.io/collector/connector"
3031
"go.opentelemetry.io/collector/consumer"
32+
"go.opentelemetry.io/collector/consumer/consumerprofiles"
3133
"go.opentelemetry.io/collector/internal/fanoutconsumer"
3234
"go.opentelemetry.io/collector/service/internal/builders"
3335
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
@@ -40,10 +42,10 @@ type Settings struct {
4042
Telemetry component.TelemetrySettings
4143
BuildInfo component.BuildInfo
4244

43-
ReceiverBuilder builders.Receiver
44-
ProcessorBuilder builders.Processor
45-
ExporterBuilder builders.Exporter
46-
ConnectorBuilder builders.Connector
45+
ReceiverBuilder *builders.ReceiverBuilder
46+
ProcessorBuilder *builders.ProcessorBuilder
47+
ExporterBuilder *builders.ExporterBuilder
48+
ConnectorBuilder *builders.ConnectorBuilder
4749

4850
// PipelineConfigs is a map of component.ID to PipelineConfig.
4951
PipelineConfigs pipelines.Config
@@ -314,6 +316,10 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
314316
cc := capabilityconsumer.NewLogs(next.(consumer.Logs), capability)
315317
n.baseConsumer = cc
316318
n.ConsumeLogsFunc = cc.ConsumeLogs
319+
case componentprofiles.DataTypeProfiles:
320+
cc := capabilityconsumer.NewProfiles(next.(consumerprofiles.Profiles), capability)
321+
n.baseConsumer = cc
322+
n.ConsumeProfilesFunc = cc.ConsumeProfiles
317323
}
318324
case *fanOutNode:
319325
nexts := g.nextConsumers(n.ID())
@@ -336,6 +342,12 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
336342
consumers = append(consumers, next.(consumer.Logs))
337343
}
338344
n.baseConsumer = fanoutconsumer.NewLogs(consumers)
345+
case componentprofiles.DataTypeProfiles:
346+
consumers := make([]consumerprofiles.Profiles, 0, len(nexts))
347+
for _, next := range nexts {
348+
consumers = append(consumers, next.(consumerprofiles.Profiles))
349+
}
350+
n.baseConsumer = fanoutconsumer.NewProfiles(consumers)
339351
}
340352
}
341353
if err != nil {
@@ -476,6 +488,7 @@ func (g *Graph) GetExporters() map[component.DataType]map[component.ID]component
476488
exportersMap[component.DataTypeTraces] = make(map[component.ID]component.Component)
477489
exportersMap[component.DataTypeMetrics] = make(map[component.ID]component.Component)
478490
exportersMap[component.DataTypeLogs] = make(map[component.ID]component.Component)
491+
exportersMap[componentprofiles.DataTypeProfiles] = make(map[component.ID]component.Component)
479492

480493
for _, pg := range g.pipelines {
481494
for _, expNode := range pg.exporters {
@@ -538,6 +551,8 @@ func connectorStability(f connector.Factory, expType, recType component.Type) co
538551
return f.TracesToMetricsStability()
539552
case component.DataTypeLogs:
540553
return f.TracesToLogsStability()
554+
case componentprofiles.DataTypeProfiles:
555+
return f.TracesToProfilesStability()
541556
}
542557
case component.DataTypeMetrics:
543558
switch recType {
@@ -547,6 +562,8 @@ func connectorStability(f connector.Factory, expType, recType component.Type) co
547562
return f.MetricsToMetricsStability()
548563
case component.DataTypeLogs:
549564
return f.MetricsToLogsStability()
565+
case componentprofiles.DataTypeProfiles:
566+
return f.MetricsToProfilesStability()
550567
}
551568
case component.DataTypeLogs:
552569
switch recType {
@@ -556,6 +573,19 @@ func connectorStability(f connector.Factory, expType, recType component.Type) co
556573
return f.LogsToMetricsStability()
557574
case component.DataTypeLogs:
558575
return f.LogsToLogsStability()
576+
case componentprofiles.DataTypeProfiles:
577+
return f.LogsToProfilesStability()
578+
}
579+
case componentprofiles.DataTypeProfiles:
580+
switch recType {
581+
case component.DataTypeTraces:
582+
return f.ProfilesToTracesStability()
583+
case component.DataTypeMetrics:
584+
return f.ProfilesToMetricsStability()
585+
case component.DataTypeLogs:
586+
return f.ProfilesToLogsStability()
587+
case componentprofiles.DataTypeProfiles:
588+
return f.ProfilesToProfilesStability()
559589
}
560590
}
561591
return component.StabilityLevelUndefined

0 commit comments

Comments
 (0)