Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions service/internal/builders/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ func (b *ConnectorBuilder) CreateTracesToTraces(ctx context.Context, set connect
}

logStabilityLevel(set.Logger, f.TracesToTracesStability())
return f.CreateTracesToTraces(ctx, set, cfg, next)
conn, err := f.CreateTracesToTraces(ctx, set, cfg, next)
if err != nil {
return nil, err
}
return wrapTraces(set.ID, set.TelemetrySettings, conn), nil
}

// CreateTracesToMetrics creates a Traces connector based on the settings and config.
Expand All @@ -67,7 +71,11 @@ func (b *ConnectorBuilder) CreateTracesToMetrics(ctx context.Context, set connec
}

logStabilityLevel(set.Logger, f.TracesToMetricsStability())
return f.CreateTracesToMetrics(ctx, set, cfg, next)
conn, err := f.CreateTracesToMetrics(ctx, set, cfg, next)
if err != nil {
return nil, err
}
return wrapTraces(set.ID, set.TelemetrySettings, conn), nil
}

// CreateTracesToLogs creates a Traces connector based on the settings and config.
Expand All @@ -86,7 +94,11 @@ func (b *ConnectorBuilder) CreateTracesToLogs(ctx context.Context, set connector
}

logStabilityLevel(set.Logger, f.TracesToLogsStability())
return f.CreateTracesToLogs(ctx, set, cfg, next)
conn, err := f.CreateTracesToLogs(ctx, set, cfg, next)
if err != nil {
return nil, err
}
return wrapTraces(set.ID, set.TelemetrySettings, conn), nil
}

// CreateTracesToProfiles creates a Traces connector based on the settings and config.
Expand All @@ -110,7 +122,11 @@ func (b *ConnectorBuilder) CreateTracesToProfiles(ctx context.Context, set conne
}

logStabilityLevel(set.Logger, f.TracesToProfilesStability())
return f.CreateTracesToProfiles(ctx, set, cfg, next)
conn, err := f.CreateTracesToProfiles(ctx, set, cfg, next)
if err != nil {
return nil, err
}
return wrapTraces(set.ID, set.TelemetrySettings, conn), nil
}

// CreateMetricsToTraces creates a Metrics connector based on the settings and config.
Expand Down
18 changes: 13 additions & 5 deletions service/internal/builders/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,37 +121,38 @@ func TestConnectorBuilder(t *testing.T) {
cfgs := map[component.ID]component.Config{tt.id: defaultCfg}
b := NewConnector(cfgs, factories)

t2t, err := b.CreateTracesToTraces(context.Background(), createConnectorSettings(tt.id), tt.nextTraces)
set := createConnectorSettings(tt.id)
t2t, err := b.CreateTracesToTraces(context.Background(), set, tt.nextTraces)
if expectedErr := tt.err(pipeline.SignalTraces, pipeline.SignalTraces); expectedErr != "" {
assert.EqualError(t, err, expectedErr)
assert.Nil(t, t2t)
} else {
assert.NoError(t, err)
assert.Equal(t, nopConnectorInstance, t2t)
assertTracesIsnopConnectorInstance(t, t2t)
}
t2m, err := b.CreateTracesToMetrics(context.Background(), createConnectorSettings(tt.id), tt.nextMetrics)
if expectedErr := tt.err(pipeline.SignalTraces, pipeline.SignalMetrics); expectedErr != "" {
assert.EqualError(t, err, expectedErr)
assert.Nil(t, t2m)
} else {
assert.NoError(t, err)
assert.Equal(t, nopConnectorInstance, t2m)
assertTracesIsnopConnectorInstance(t, t2m)
}
t2l, err := b.CreateTracesToLogs(context.Background(), createConnectorSettings(tt.id), tt.nextLogs)
if expectedErr := tt.err(pipeline.SignalTraces, pipeline.SignalLogs); expectedErr != "" {
assert.EqualError(t, err, expectedErr)
assert.Nil(t, t2l)
} else {
assert.NoError(t, err)
assert.Equal(t, nopConnectorInstance, t2l)
assertTracesIsnopConnectorInstance(t, t2l)
}
t2p, err := b.CreateTracesToProfiles(context.Background(), createConnectorSettings(tt.id), tt.nextProfiles)
if expectedErr := tt.err(pipeline.SignalTraces, componentprofiles.SignalProfiles); expectedErr != "" {
assert.EqualError(t, err, expectedErr)
assert.Nil(t, t2p)
} else {
assert.NoError(t, err)
assert.Equal(t, nopConnectorInstance, t2p)
assertTracesIsnopConnectorInstance(t, t2p)
}

m2t, err := b.CreateMetricsToTraces(context.Background(), createConnectorSettings(tt.id), tt.nextTraces)
Expand Down Expand Up @@ -476,6 +477,13 @@ func TestNewNopConnectorConfigsAndFactories(t *testing.T) {
assert.IsType(t, profilesToProfiles, bProfilesToProfiles)
}

func assertTracesIsnopConnectorInstance(t *testing.T, conn connector.Traces) {
t.Helper()
dt, ok := conn.(*debugTraces)
require.True(t, ok)
assert.Equal(t, nopConnectorInstance, dt.tc)
}

var nopConnectorInstance = &nopConnector{
Consumer: consumertest.NewNop(),
}
Expand Down
56 changes: 56 additions & 0 deletions service/internal/builders/traces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package builders // import "go.opentelemetry.io/collector/service/internal/builders"

import (
"context"
"reflect"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
)

type tracesComponent interface {
component.Component
consumer.Traces
}

type debugTraces struct {
id component.ID
set component.TelemetrySettings
tc tracesComponent
}

func (dc *debugTraces) Start(ctx context.Context, host component.Host) error {
return dc.tc.Start(ctx, host)
}

func (dc *debugTraces) Shutdown(ctx context.Context) error {
return dc.tc.Shutdown(ctx)
}

func (dc *debugTraces) Capabilities() consumer.Capabilities {
return dc.tc.Capabilities()
}

func (dc *debugTraces) ConsumeTraces(ctx context.Context, tr ptrace.Traces) error {
err := dc.tc.ConsumeTraces(ctx, tr)
if err == nil {
return nil
}
val := reflect.ValueOf(err)
if val.Kind() == reflect.Ptr && val.IsNil() {
dc.set.Logger.Error("nil error that is not nil",
zap.String("error_type", val.Type().String()))
return nil
}
return err
}

func wrapTraces(id component.ID, set component.TelemetrySettings, tc tracesComponent) tracesComponent {
return &debugTraces{id: id, set: set, tc: tc}
}
Loading