Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 72 additions & 32 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,17 @@ type Settings struct {

// Service represents the implementation of a component.Host.
type Service struct {
buildInfo component.BuildInfo
telemetrySettings component.TelemetrySettings
host *graph.Host
collectorConf *confmap.Conf
telemetryProviders telemetry.Providers
buildInfo component.BuildInfo
telemetrySettings component.TelemetrySettings
host *graph.Host
collectorConf *confmap.Conf
loggerProvider telemetry.LoggerProvider
meterProvider telemetry.MeterProvider
tracerProvider telemetry.TracerProvider
}

// New creates a new Service, its telemetry, and Components.
func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
func New(ctx context.Context, set Settings, cfg Config) (_ *Service, resultErr error) {
srv := &Service{
buildInfo: set.BuildInfo,
host: &graph.Host{
Expand All @@ -123,37 +125,30 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
collectorConf: set.CollectorConf,
}

telFactory := otelconftelemetry.NewFactory()
telset := telemetry.Settings{
BuildInfo: set.BuildInfo,
ZapOptions: set.LoggingOptions,
DefaultViews: configureViews,
}
telemetryFactory := otelconftelemetry.NewFactory()
telemetrySettings := telemetry.Settings{BuildInfo: set.BuildInfo}

telProviders, err := telFactory.CreateProviders(ctx, telset, &cfg.Telemetry)
// Create the logger & LoggerProvider first. These may be used
// when creating the other telemetry providers.
loggerSettings := telemetry.LoggerSettings{
Settings: telemetrySettings,
ZapOptions: set.LoggingOptions,
}
logger, loggerProvider, err := telemetryFactory.CreateLogger(ctx, loggerSettings, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create telemetry providers: %w", err)
return nil, fmt.Errorf("failed to create logger: %w", err)
}
srv.telemetryProviders = telProviders
defer func() {
if err != nil {
err = multierr.Append(err, telProviders.Shutdown(ctx))
}
}()

// Use initialized logger to handle any subsequent errors
// https://github.com/open-telemetry/opentelemetry-collector/pull/13081
logger := telProviders.Logger()
defer func() {
if err != nil {
logger.Error("error found during service initialization", zap.Error(err))
if resultErr != nil {
logger.Error("error found during service initialization", zap.Error(resultErr))
resultErr = multierr.Append(resultErr, loggerProvider.Shutdown(ctx))
}
}()
srv.loggerProvider = loggerProvider

// Wrap the zap.Logger with componentattribute so scope attributes
// can be added and removed dynamically, and tee logs to the
// LoggerProvider.
loggerProvider := telProviders.LoggerProvider()
logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())
core = componentattribute.NewOTelTeeCoreWithAttributes(
Expand All @@ -165,11 +160,50 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return core
}))

meterSettings := telemetry.MeterSettings{
Settings: telemetrySettings,
Logger: logger,
}
mpConfig := &cfg.Telemetry.Metrics.MeterProvider
if mpConfig.Views == nil {
mpConfig.Views = configureViews(cfg.Telemetry.Metrics.Level)
}
meterProvider, err := telemetryFactory.CreateMeterProvider(ctx, meterSettings, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create meter provider: %w", err)
}
defer func() {
if resultErr != nil {
resultErr = multierr.Append(resultErr, meterProvider.Shutdown(ctx))
}
}()
srv.meterProvider = meterProvider

tracerSettings := telemetry.TracerSettings{
Settings: telemetrySettings,
Logger: logger,
}
tracerProvider, err := telemetryFactory.CreateTracerProvider(ctx, tracerSettings, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create tracer provider: %w", err)
}
defer func() {
if resultErr != nil {
resultErr = multierr.Append(resultErr, tracerProvider.Shutdown(ctx))
}
}()
srv.tracerProvider = tracerProvider

resource, err := telemetryFactory.CreateResource(ctx, telemetrySettings, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}

srv.telemetrySettings = component.TelemetrySettings{
Logger: logger,
MeterProvider: telProviders.MeterProvider(),
TracerProvider: telProviders.TracerProvider(),
Resource: telProviders.Resource(),
MeterProvider: meterProvider,
TracerProvider: tracerProvider,
Resource: resource,
}
srv.host.Reporter = status.NewReporter(srv.host.NotifyComponentStatusChange, func(err error) {
if errors.Is(err, status.ErrStatusNotReady) {
Expand Down Expand Up @@ -255,8 +289,14 @@ func (srv *Service) Shutdown(ctx context.Context) error {

srv.telemetrySettings.Logger.Info("Shutdown complete.")

if err := srv.telemetryProviders.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry providers: %w", err))
if err := srv.loggerProvider.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown logger provider: %w", err))
}
if err := srv.meterProvider.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown meter provider: %w", err))
}
if err := srv.tracerProvider.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown tracer provider: %w", err))
}

return errs
Expand Down
98 changes: 72 additions & 26 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,17 @@ func TestServiceTelemetryShutdownError(t *testing.T) {
},
},
}}
cfg.Telemetry.Metrics.Level = configtelemetry.LevelDetailed
cfg.Telemetry.Metrics.Readers = []config.MetricReader{{
Periodic: &config.PeriodicMetricReader{
Exporter: config.PushMetricExporter{
OTLP: &config.OTLPMetric{
Protocol: ptr("http/protobuf"),
Endpoint: ptr("http://testing.invalid"),
},
},
},
}}

// Create and start a service
srv, err := New(context.Background(), newNopSettings(), cfg)
Expand All @@ -553,7 +564,8 @@ func TestServiceTelemetryShutdownError(t *testing.T) {

// Shutdown the service
err = srv.Shutdown(context.Background())
assert.ErrorContains(t, err, `failed to shutdown telemetry`)
require.ErrorContains(t, err, `failed to shutdown logger provider`)
require.ErrorContains(t, err, `failed to shutdown meter provider`)
}

func TestExtensionNotificationFailure(t *testing.T) {
Expand Down Expand Up @@ -635,40 +647,74 @@ func TestServiceFatalError(t *testing.T) {
func TestServiceInvalidTelemetryConfiguration(t *testing.T) {
tests := []struct {
name string
wantErr error
wantErr string
cfg otelconftelemetry.Config
}{
{
name: "log config with processors and invalid config",
cfg: otelconftelemetry.Config{
Logs: otelconftelemetry.LogsConfig{
Encoding: "console",
Processors: []config.LogRecordProcessor{
{
Batch: &config.BatchLogRecordProcessor{
Exporter: config.LogRecordExporter{
OTLP: &config.OTLP{},
},
},
cfg: func() otelconftelemetry.Config {
cfg := otelconftelemetry.NewFactory().CreateDefaultConfig().(*otelconftelemetry.Config)
cfg.Logs.Level = zapcore.FatalLevel
cfg.Logs.Processors = []config.LogRecordProcessor{{
Batch: &config.BatchLogRecordProcessor{
Exporter: config.LogRecordExporter{
OTLP: &config.OTLP{},
},
},
},
},
wantErr: errors.New("no valid log exporter"),
}}
return *cfg
}(),
wantErr: "failed to create logger: no valid log exporter",
},
{
name: "invalid metric reader",
cfg: func() otelconftelemetry.Config {
cfg := otelconftelemetry.NewFactory().CreateDefaultConfig().(*otelconftelemetry.Config)
cfg.Logs.Level = zapcore.FatalLevel
cfg.Metrics.Level = configtelemetry.LevelDetailed
cfg.Metrics.Readers = []config.MetricReader{{
Periodic: &config.PeriodicMetricReader{
Exporter: config.PushMetricExporter{
OTLP: &config.OTLPMetric{},
},
},
}}
return *cfg
}(),
wantErr: "failed to create meter provider: no valid metric exporter",
},
{
name: "invalid trace exporter",
cfg: func() otelconftelemetry.Config {
cfg := otelconftelemetry.NewFactory().CreateDefaultConfig().(*otelconftelemetry.Config)
cfg.Logs.Level = zapcore.FatalLevel
cfg.Traces.Level = configtelemetry.LevelDetailed
cfg.Traces.Processors = []config.SpanProcessor{{
Batch: &config.BatchSpanProcessor{
Exporter: config.SpanExporter{
OTLP: &config.OTLP{},
},
},
}}
return *cfg
}(),
wantErr: "failed to create tracer provider: no valid span exporter",
},
}
for _, tt := range tests {
set := newNopSettings()
set.AsyncErrorChannel = make(chan error)

cfg := newNopConfig()
cfg.Telemetry = tt.cfg
_, err := New(context.Background(), set, cfg)
if tt.wantErr != nil {
require.ErrorContains(t, err, tt.wantErr.Error())
} else {
require.NoError(t, err)
}
t.Run(tt.name, func(t *testing.T) {
set := newNopSettings()
set.AsyncErrorChannel = make(chan error)

cfg := newNopConfig()
cfg.Telemetry = tt.cfg
_, err := New(context.Background(), set, cfg)
if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
})
}
}

Expand Down
8 changes: 7 additions & 1 deletion service/telemetry/otelconftelemetry/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ var useLocalHostAsDefaultMetricsAddressFeatureGate = featuregate.GlobalRegistry(
// NewFactory creates a new telemetry.Factory that uses otelconf
// to configure opentelemetry-go SDK telemetry providers.
func NewFactory() telemetry.Factory {
return telemetry.NewFactory(createDefaultConfig, createProviders)
return telemetry.NewFactory(
createDefaultConfig,
telemetry.WithCreateResource(createResource),
telemetry.WithCreateLogger(createLogger),
telemetry.WithCreateMeterProvider(createMeterProvider),
telemetry.WithCreateTracerProvider(createTracerProvider),
)
}

func createDefaultConfig() component.Config {
Expand Down
4 changes: 2 additions & 2 deletions service/telemetry/otelconftelemetry/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
// createLogger creates a Logger and a LoggerProvider from Config.
func createLogger(
ctx context.Context,
set telemetry.Settings,
set telemetry.LoggerSettings,
componentConfig component.Config,
) (*zap.Logger, telemetry.LoggerProvider, error) {
cfg := componentConfig.(*Config)
res := newResource(set, cfg)
res := newResource(set.Settings, cfg)

// Copied from NewProductionConfig.
ec := zap.NewProductionEncoderConfig()
Expand Down
11 changes: 5 additions & 6 deletions service/telemetry/otelconftelemetry/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,8 @@ func TestCreateLogger(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buildInfo := component.BuildInfo{}

_, provider, err := createLogger(context.Background(), telemetry.Settings{
BuildInfo: buildInfo,
_, provider, err := NewFactory().CreateLogger(context.Background(), telemetry.LoggerSettings{
Settings: telemetry.Settings{BuildInfo: buildInfo},
}, &tt.cfg)
if tt.wantErr != nil {
require.ErrorContains(t, err, tt.wantErr.Error())
Expand Down Expand Up @@ -222,8 +221,8 @@ func TestCreateLoggerWithResource(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
core, observedLogs := observer.New(zapcore.DebugLevel)
set := telemetry.Settings{
BuildInfo: tt.buildInfo,
set := telemetry.LoggerSettings{
Settings: telemetry.Settings{BuildInfo: tt.buildInfo},
ZapOptions: []zap.Option{
// Redirect logs to the observer core
zap.WrapCore(func(zapcore.Core) zapcore.Core { return core }),
Expand Down Expand Up @@ -348,7 +347,7 @@ func newOTLPLoggerProvider(t *testing.T, level zapcore.Level, handler http.Handl
},
}

_, loggerProvider, err := createLogger(t.Context(), telemetry.Settings{}, cfg)
_, loggerProvider, err := createLogger(t.Context(), telemetry.LoggerSettings{}, cfg)
require.NoError(t, err)
return loggerProvider
}
Expand Down
8 changes: 3 additions & 5 deletions service/telemetry/otelconftelemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
Expand All @@ -17,19 +16,18 @@ import (

func createMeterProvider(
ctx context.Context,
set telemetry.Settings,
set telemetry.MeterSettings,
componentConfig component.Config,
logger *zap.Logger,
) (telemetry.MeterProvider, error) {
cfg := componentConfig.(*Config)
if cfg.Metrics.Level == configtelemetry.LevelNone {
logger.Info("Internal metrics telemetry disabled")
set.Logger.Info("Internal metrics telemetry disabled")
return noopMeterProvider{MeterProvider: noopmetric.NewMeterProvider()}, nil
} else if cfg.Metrics.Views == nil && set.DefaultViews != nil {
cfg.Metrics.Views = set.DefaultViews(cfg.Metrics.Level)
}

res := newResource(set, cfg)
res := newResource(set.Settings, cfg)
mpConfig := cfg.Metrics.MeterProvider
sdk, err := newSDK(ctx, res, config.OpenTelemetryConfiguration{
MeterProvider: &mpConfig,
Expand Down
Loading