Skip to content

Commit bd63054

Browse files
djaglowskijackgopack4
authored andcommitted
[chore][graph] Separate node types (open-telemetry#11321)
Having spent some time on open-telemetry#11311, I think it may be time to start refactoring this codebase into a more maintainable state. This PR just moves the various types of nodes into separate files, which I think is a bit more readable when considering changes.
1 parent c4160ef commit bd63054

File tree

8 files changed

+329
-245
lines changed

8 files changed

+329
-245
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package graph // import "go.opentelemetry.io/collector/service/internal/graph"
5+
6+
import (
7+
"go.opentelemetry.io/collector/consumer"
8+
"go.opentelemetry.io/collector/consumer/consumerprofiles"
9+
"go.opentelemetry.io/collector/pipeline"
10+
)
11+
12+
const capabilitiesSeed = "capabilities"
13+
14+
var _ consumerNode = (*capabilitiesNode)(nil)
15+
16+
// Every pipeline has a "virtual" capabilities node immediately after the receiver(s).
17+
// There are two purposes for this node:
18+
// 1. Present aggregated capabilities to receivers, such as whether the pipeline mutates data.
19+
// 2. Present a consistent "first consumer" for each pipeline.
20+
// The nodeID is derived from "pipeline ID".
21+
type capabilitiesNode struct {
22+
nodeID
23+
pipelineID pipeline.ID
24+
baseConsumer
25+
consumer.ConsumeTracesFunc
26+
consumer.ConsumeMetricsFunc
27+
consumer.ConsumeLogsFunc
28+
consumerprofiles.ConsumeProfilesFunc
29+
}
30+
31+
func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode {
32+
return &capabilitiesNode{
33+
nodeID: newNodeID(capabilitiesSeed, pipelineID.String()),
34+
pipelineID: pipelineID,
35+
}
36+
}
37+
38+
func (n *capabilitiesNode) getConsumer() baseConsumer {
39+
return n
40+
}

service/internal/graph/nodes.go renamed to service/internal/graph/connector.go

Lines changed: 1 addition & 245 deletions
Original file line numberDiff line numberDiff line change
@@ -5,215 +5,20 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph"
55

66
import (
77
"context"
8-
"fmt"
9-
"hash/fnv"
10-
"strings"
118

129
"go.opentelemetry.io/collector/component"
1310
"go.opentelemetry.io/collector/component/componentprofiles"
1411
"go.opentelemetry.io/collector/connector"
1512
"go.opentelemetry.io/collector/connector/connectorprofiles"
1613
"go.opentelemetry.io/collector/consumer"
1714
"go.opentelemetry.io/collector/consumer/consumerprofiles"
18-
"go.opentelemetry.io/collector/exporter"
19-
"go.opentelemetry.io/collector/internal/fanoutconsumer"
2015
"go.opentelemetry.io/collector/pipeline"
21-
"go.opentelemetry.io/collector/processor"
22-
"go.opentelemetry.io/collector/receiver"
2316
"go.opentelemetry.io/collector/service/internal/builders"
2417
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
2518
"go.opentelemetry.io/collector/service/internal/components"
2619
)
2720

28-
const (
29-
receiverSeed = "receiver"
30-
processorSeed = "processor"
31-
exporterSeed = "exporter"
32-
connectorSeed = "connector"
33-
capabilitiesSeed = "capabilities"
34-
fanOutToExporters = "fanout_to_exporters"
35-
)
36-
37-
// baseConsumer redeclared here since not public in consumer package. May consider to make that public.
38-
type baseConsumer interface {
39-
Capabilities() consumer.Capabilities
40-
}
41-
42-
type nodeID int64
43-
44-
func (n nodeID) ID() int64 {
45-
return int64(n)
46-
}
47-
48-
func newNodeID(parts ...string) nodeID {
49-
h := fnv.New64a()
50-
h.Write([]byte(strings.Join(parts, "|")))
51-
return nodeID(h.Sum64())
52-
}
53-
54-
type consumerNode interface {
55-
getConsumer() baseConsumer
56-
}
57-
58-
// A receiver instance can be shared by multiple pipelines of the same type.
59-
// Therefore, nodeID is derived from "pipeline type" and "component ID".
60-
type receiverNode struct {
61-
nodeID
62-
componentID component.ID
63-
pipelineType pipeline.Signal
64-
component.Component
65-
}
66-
67-
func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode {
68-
return &receiverNode{
69-
nodeID: newNodeID(receiverSeed, pipelineType.String(), recvID.String()),
70-
componentID: recvID,
71-
pipelineType: pipelineType,
72-
}
73-
}
74-
75-
func (n *receiverNode) buildComponent(ctx context.Context,
76-
tel component.TelemetrySettings,
77-
info component.BuildInfo,
78-
builder *builders.ReceiverBuilder,
79-
nexts []baseConsumer,
80-
) error {
81-
tel.Logger = components.ReceiverLogger(tel.Logger, n.componentID, n.pipelineType)
82-
set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
83-
var err error
84-
switch n.pipelineType {
85-
case pipeline.SignalTraces:
86-
var consumers []consumer.Traces
87-
for _, next := range nexts {
88-
consumers = append(consumers, next.(consumer.Traces))
89-
}
90-
n.Component, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers))
91-
case pipeline.SignalMetrics:
92-
var consumers []consumer.Metrics
93-
for _, next := range nexts {
94-
consumers = append(consumers, next.(consumer.Metrics))
95-
}
96-
n.Component, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers))
97-
case pipeline.SignalLogs:
98-
var consumers []consumer.Logs
99-
for _, next := range nexts {
100-
consumers = append(consumers, next.(consumer.Logs))
101-
}
102-
n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers))
103-
case componentprofiles.SignalProfiles:
104-
var consumers []consumerprofiles.Profiles
105-
for _, next := range nexts {
106-
consumers = append(consumers, next.(consumerprofiles.Profiles))
107-
}
108-
n.Component, err = builder.CreateProfiles(ctx, set, fanoutconsumer.NewProfiles(consumers))
109-
default:
110-
return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, n.pipelineType)
111-
}
112-
if err != nil {
113-
return fmt.Errorf("failed to create %q receiver for data type %q: %w", set.ID, n.pipelineType, err)
114-
}
115-
return nil
116-
}
117-
118-
var _ consumerNode = (*processorNode)(nil)
119-
120-
// Every processor instance is unique to one pipeline.
121-
// Therefore, nodeID is derived from "pipeline ID" and "component ID".
122-
type processorNode struct {
123-
nodeID
124-
componentID component.ID
125-
pipelineID pipeline.ID
126-
component.Component
127-
}
128-
129-
func newProcessorNode(pipelineID pipeline.ID, procID component.ID) *processorNode {
130-
return &processorNode{
131-
nodeID: newNodeID(processorSeed, pipelineID.String(), procID.String()),
132-
componentID: procID,
133-
pipelineID: pipelineID,
134-
}
135-
}
136-
137-
func (n *processorNode) getConsumer() baseConsumer {
138-
return n.Component.(baseConsumer)
139-
}
140-
141-
func (n *processorNode) buildComponent(ctx context.Context,
142-
tel component.TelemetrySettings,
143-
info component.BuildInfo,
144-
builder *builders.ProcessorBuilder,
145-
next baseConsumer,
146-
) error {
147-
tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID)
148-
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
149-
var err error
150-
switch n.pipelineID.Signal() {
151-
case pipeline.SignalTraces:
152-
n.Component, err = builder.CreateTraces(ctx, set, next.(consumer.Traces))
153-
case pipeline.SignalMetrics:
154-
n.Component, err = builder.CreateMetrics(ctx, set, next.(consumer.Metrics))
155-
case pipeline.SignalLogs:
156-
n.Component, err = builder.CreateLogs(ctx, set, next.(consumer.Logs))
157-
case componentprofiles.SignalProfiles:
158-
n.Component, err = builder.CreateProfiles(ctx, set, next.(consumerprofiles.Profiles))
159-
default:
160-
return fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, n.pipelineID.String(), n.pipelineID.Signal())
161-
}
162-
if err != nil {
163-
return fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, n.pipelineID.String(), err)
164-
}
165-
return nil
166-
}
167-
168-
var _ consumerNode = (*exporterNode)(nil)
169-
170-
// An exporter instance can be shared by multiple pipelines of the same type.
171-
// Therefore, nodeID is derived from "pipeline type" and "component ID".
172-
type exporterNode struct {
173-
nodeID
174-
componentID component.ID
175-
pipelineType pipeline.Signal
176-
component.Component
177-
}
178-
179-
func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode {
180-
return &exporterNode{
181-
nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()),
182-
componentID: exprID,
183-
pipelineType: pipelineType,
184-
}
185-
}
186-
187-
func (n *exporterNode) getConsumer() baseConsumer {
188-
return n.Component.(baseConsumer)
189-
}
190-
191-
func (n *exporterNode) buildComponent(
192-
ctx context.Context,
193-
tel component.TelemetrySettings,
194-
info component.BuildInfo,
195-
builder *builders.ExporterBuilder,
196-
) error {
197-
tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType)
198-
set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
199-
var err error
200-
switch n.pipelineType {
201-
case pipeline.SignalTraces:
202-
n.Component, err = builder.CreateTraces(ctx, set)
203-
case pipeline.SignalMetrics:
204-
n.Component, err = builder.CreateMetrics(ctx, set)
205-
case pipeline.SignalLogs:
206-
n.Component, err = builder.CreateLogs(ctx, set)
207-
case componentprofiles.SignalProfiles:
208-
n.Component, err = builder.CreateProfiles(ctx, set)
209-
default:
210-
return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType)
211-
}
212-
if err != nil {
213-
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
214-
}
215-
return nil
216-
}
21+
const connectorSeed = "connector"
21722

21823
var _ consumerNode = (*connectorNode)(nil)
21924

@@ -420,52 +225,3 @@ func (n *connectorNode) buildComponent(
420225
}
421226
return nil
422227
}
423-
424-
var _ consumerNode = (*capabilitiesNode)(nil)
425-
426-
// Every pipeline has a "virtual" capabilities node immediately after the receiver(s).
427-
// There are two purposes for this node:
428-
// 1. Present aggregated capabilities to receivers, such as whether the pipeline mutates data.
429-
// 2. Present a consistent "first consumer" for each pipeline.
430-
// The nodeID is derived from "pipeline ID".
431-
type capabilitiesNode struct {
432-
nodeID
433-
pipelineID pipeline.ID
434-
baseConsumer
435-
consumer.ConsumeTracesFunc
436-
consumer.ConsumeMetricsFunc
437-
consumer.ConsumeLogsFunc
438-
consumerprofiles.ConsumeProfilesFunc
439-
}
440-
441-
func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode {
442-
return &capabilitiesNode{
443-
nodeID: newNodeID(capabilitiesSeed, pipelineID.String()),
444-
pipelineID: pipelineID,
445-
}
446-
}
447-
448-
func (n *capabilitiesNode) getConsumer() baseConsumer {
449-
return n
450-
}
451-
452-
var _ consumerNode = (*fanOutNode)(nil)
453-
454-
// Each pipeline has one fan-out node before exporters.
455-
// Therefore, nodeID is derived from "pipeline ID".
456-
type fanOutNode struct {
457-
nodeID
458-
pipelineID pipeline.ID
459-
baseConsumer
460-
}
461-
462-
func newFanOutNode(pipelineID pipeline.ID) *fanOutNode {
463-
return &fanOutNode{
464-
nodeID: newNodeID(fanOutToExporters, pipelineID.String()),
465-
pipelineID: pipelineID,
466-
}
467-
}
468-
469-
func (n *fanOutNode) getConsumer() baseConsumer {
470-
return n.baseConsumer
471-
}

service/internal/graph/consumer.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package graph // import "go.opentelemetry.io/collector/service/internal/graph"
5+
6+
import (
7+
"go.opentelemetry.io/collector/consumer"
8+
)
9+
10+
// baseConsumer redeclared here since not public in consumer package. May consider to make that public.
11+
type baseConsumer interface {
12+
Capabilities() consumer.Capabilities
13+
}
14+
15+
type consumerNode interface {
16+
getConsumer() baseConsumer
17+
}

service/internal/graph/exporter.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package graph // import "go.opentelemetry.io/collector/service/internal/graph"
5+
6+
import (
7+
"context"
8+
"fmt"
9+
10+
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/component/componentprofiles"
12+
"go.opentelemetry.io/collector/exporter"
13+
"go.opentelemetry.io/collector/pipeline"
14+
"go.opentelemetry.io/collector/service/internal/builders"
15+
"go.opentelemetry.io/collector/service/internal/components"
16+
)
17+
18+
const exporterSeed = "exporter"
19+
20+
var _ consumerNode = (*exporterNode)(nil)
21+
22+
// An exporter instance can be shared by multiple pipelines of the same type.
23+
// Therefore, nodeID is derived from "pipeline type" and "component ID".
24+
type exporterNode struct {
25+
nodeID
26+
componentID component.ID
27+
pipelineType pipeline.Signal
28+
component.Component
29+
}
30+
31+
func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode {
32+
return &exporterNode{
33+
nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()),
34+
componentID: exprID,
35+
pipelineType: pipelineType,
36+
}
37+
}
38+
39+
func (n *exporterNode) getConsumer() baseConsumer {
40+
return n.Component.(baseConsumer)
41+
}
42+
43+
func (n *exporterNode) buildComponent(
44+
ctx context.Context,
45+
tel component.TelemetrySettings,
46+
info component.BuildInfo,
47+
builder *builders.ExporterBuilder,
48+
) error {
49+
tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType)
50+
set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
51+
var err error
52+
switch n.pipelineType {
53+
case pipeline.SignalTraces:
54+
n.Component, err = builder.CreateTraces(ctx, set)
55+
case pipeline.SignalMetrics:
56+
n.Component, err = builder.CreateMetrics(ctx, set)
57+
case pipeline.SignalLogs:
58+
n.Component, err = builder.CreateLogs(ctx, set)
59+
case componentprofiles.SignalProfiles:
60+
n.Component, err = builder.CreateProfiles(ctx, set)
61+
default:
62+
return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType)
63+
}
64+
if err != nil {
65+
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
66+
}
67+
return nil
68+
}

0 commit comments

Comments
 (0)