Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions service/telemetry/otelconftelemetry/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@
package otelconftelemetry // import "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"

import (
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/resource"
"context"

otelconf "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/telemetry"
)

// newLogger creates a Logger and a LoggerProvider from Config.
func newLogger(set telemetry.Settings, cfg *Config, sdk *config.SDK, res *resource.Resource) (*zap.Logger, log.LoggerProvider, error) {
// createLogger creates a Logger and a LoggerProvider from Config.
func createLogger(
ctx context.Context,
set telemetry.Settings,
componentConfig component.Config,
) (*zap.Logger, telemetry.LoggerProvider, error) {
cfg := componentConfig.(*Config)
res := newResource(set, cfg)

// Copied from NewProductionConfig.
ec := zap.NewProductionEncoderConfig()
ec.EncodeTime = zapcore.ISO8601TimeEncoder
Expand Down Expand Up @@ -70,6 +78,13 @@ func newLogger(set telemetry.Settings, cfg *Config, sdk *config.SDK, res *resour
}))
}

lp := sdk.LoggerProvider()
return logger, lp, nil
sdk, err := newSDK(ctx, res, otelconf.OpenTelemetryConfiguration{
LoggerProvider: &otelconf.LoggerProvider{
Processors: cfg.Logs.Processors,
},
})
if err != nil {
return nil, nil, err
}
return logger, sdk.LoggerProvider().(telemetry.LoggerProvider), nil
}
71 changes: 55 additions & 16 deletions service/telemetry/otelconftelemetry/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/log"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
Expand All @@ -31,7 +33,7 @@ const (
testValue = "test-value"
)

func TestNewLogger(t *testing.T) {
func TestCreateLogger(t *testing.T) {
tests := []struct {
name string
wantCoreType any
Expand All @@ -43,6 +45,29 @@ func TestNewLogger(t *testing.T) {
cfg: Config{},
wantErr: errors.New("no encoder name specified"),
},
{
name: "log config with invalid processors",
cfg: Config{
Logs: LogsConfig{
Level: zapcore.DebugLevel,
Development: true,
Encoding: "console",
DisableCaller: true,
DisableStacktrace: true,
InitialFields: map[string]any{"fieldKey": "filed-value"},
Processors: []config.LogRecordProcessor{
{
Batch: &config.BatchLogRecordProcessor{
Exporter: config.LogRecordExporter{
OTLP: &config.OTLP{}, // missing required fields
},
},
},
},
},
},
wantErr: errors.New("no valid log exporter"),
},
{
name: "log config with no processors",
cfg: Config{
Expand Down Expand Up @@ -103,20 +128,21 @@ func TestNewLogger(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buildInfo := component.BuildInfo{}
factory := NewFactory()

providers, err := factory.CreateProviders(context.Background(), telemetry.Settings{BuildInfo: buildInfo}, &tt.cfg)
_, provider, err := createLogger(context.Background(), telemetry.Settings{
BuildInfo: buildInfo,
}, &tt.cfg)
if tt.wantErr != nil {
require.ErrorContains(t, err, tt.wantErr.Error())
} else {
require.NoError(t, err)
require.NoError(t, providers.Shutdown(context.Background()))
require.NoError(t, provider.Shutdown(context.Background()))
}
})
}
}

func TestNewLoggerWithResource(t *testing.T) {
func TestCreateLoggerWithResource(t *testing.T) {
tests := []struct {
name string
buildInfo component.BuildInfo
Expand Down Expand Up @@ -195,18 +221,29 @@ func TestNewLoggerWithResource(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
set := telemetry.Settings{BuildInfo: tt.buildInfo}
cfg := Config{
core, observedLogs := observer.New(zapcore.DebugLevel)
set := telemetry.Settings{
BuildInfo: tt.buildInfo,
ZapOptions: []zap.Option{
// Redirect logs to the observer core
zap.WrapCore(func(zapcore.Core) zapcore.Core { return core }),
},
}
cfg := &Config{
Logs: LogsConfig{
Level: zapcore.InfoLevel,
Encoding: "json",
},
Resource: tt.resourceConfig,
}

tel, observedLogs := newTelemetryProviders(t, set, &cfg)
mylogger := tel.Logger()
mylogger.Info("Test log message")
logger, loggerProvider, err := createLogger(t.Context(), set, cfg)
require.NoError(t, err)
defer func() {
assert.NoError(t, loggerProvider.Shutdown(t.Context()))
}()

logger.Info("Test log message")
require.Len(t, observedLogs.All(), 1)

entry := observedLogs.All()[0]
Expand Down Expand Up @@ -266,6 +303,9 @@ func TestLoggerProvider(t *testing.T) {

receivedLogs++
})
defer func() {
assert.NoError(t, lp.Shutdown(t.Context()))
}()

// Generate some logs to send to the backend
logger := lp.Logger("name")
Expand All @@ -279,7 +319,7 @@ func TestLoggerProvider(t *testing.T) {
require.Equal(t, totalLogs, receivedLogs)
}

func newOTLPLoggerProvider(t *testing.T, level zapcore.Level, handler http.HandlerFunc) log.LoggerProvider {
func newOTLPLoggerProvider(t *testing.T, level zapcore.Level, handler http.HandlerFunc) telemetry.LoggerProvider {
srv := createBackend("/v1/logs", handler)
t.Cleanup(srv.Close)

Expand All @@ -295,7 +335,7 @@ func newOTLPLoggerProvider(t *testing.T, level zapcore.Level, handler http.Handl
},
}}

cfg := Config{
cfg := &Config{
Logs: LogsConfig{
Level: level,
Encoding: "json",
Expand All @@ -308,10 +348,9 @@ func newOTLPLoggerProvider(t *testing.T, level zapcore.Level, handler http.Handl
},
}

tel, _ := newTelemetryProviders(t, telemetry.Settings{}, &cfg)
lp := tel.LoggerProvider()
require.NotNil(t, lp)
return lp
_, loggerProvider, err := createLogger(t.Context(), telemetry.Settings{}, cfg)
require.NoError(t, err)
return loggerProvider
}

func createBackend(endpoint string, handler func(http.ResponseWriter, *http.Request)) *httptest.Server {
Expand Down
49 changes: 49 additions & 0 deletions service/telemetry/otelconftelemetry/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otelconftelemetry // import "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"

import (
"context"

config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/service/telemetry"
)

func createMeterProvider(
ctx context.Context,
set telemetry.Settings,
componentConfig component.Config,
logger *zap.Logger,
) (telemetry.MeterProvider, error) {
cfg := componentConfig.(*Config)
if cfg.Metrics.Level == configtelemetry.LevelNone {
logger.Info("Internal metrics telemetry disabled")
return noopMeterProvider{MeterProvider: noopmetric.NewMeterProvider()}, nil
} else if cfg.Metrics.Views == nil && set.DefaultViews != nil {
cfg.Metrics.Views = set.DefaultViews(cfg.Metrics.Level)
}

res := newResource(set, cfg)
mpConfig := cfg.Metrics.MeterProvider
sdk, err := newSDK(ctx, res, config.OpenTelemetryConfiguration{
MeterProvider: &mpConfig,
})
if err != nil {
return nil, err
}
return sdk.MeterProvider().(telemetry.MeterProvider), nil
}

type noopMeterProvider struct {
noopmetric.MeterProvider
}

func (noopMeterProvider) Shutdown(context.Context) error {
return nil
}
47 changes: 36 additions & 11 deletions service/telemetry/otelconftelemetry/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -38,7 +41,7 @@ const (

var testInstanceID = "test_instance_id"

func TestTelemetryInit(t *testing.T) {
func TestCreateMeterProvider(t *testing.T) {
type metricValue struct {
value float64
labels map[string]string
Expand Down Expand Up @@ -115,8 +118,11 @@ func TestTelemetryInit(t *testing.T) {
}

t.Run(tt.name, func(t *testing.T) {
providers, _ := newTelemetryProviders(t, telemetry.Settings{}, cfg)
mp := providers.MeterProvider()
mp, err := createMeterProvider(t.Context(), telemetry.Settings{}, cfg, zap.NewNop())
require.NoError(t, err)
defer func() {
assert.NoError(t, mp.Shutdown(t.Context()))
}()

createTestMetrics(t, mp)

Expand Down Expand Up @@ -194,21 +200,37 @@ func getMetricsFromPrometheus(t *testing.T, endpoint string) map[string]*io_prom
return parsed
}

func TestTelemetryMetricsDisabled(t *testing.T) {
func TestCreateMeterProvider_Invalid(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Logs.Level = zapcore.FatalLevel
cfg.Traces.Level = configtelemetry.LevelNone
cfg.Metrics.Readers = []config.MetricReader{{
// Invalid -- no OTLP protocol defined
Periodic: &config.PeriodicMetricReader{Exporter: config.PushMetricExporter{OTLP: &config.OTLPMetric{}}},
}}
_, err := createProviders(t.Context(), telemetry.Settings{}, cfg)
require.EqualError(t, err, "failed to create meter provider: no valid metric exporter")
}

factory := NewFactory()
_, err := factory.CreateProviders(context.Background(), telemetry.Settings{}, cfg)
require.EqualError(t, err, "failed to create SDK: no valid metric exporter")

func TestCreateMeterProvider_Disabled(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Metrics.Readers = []config.MetricReader{{
// Invalid -- no OTLP protocol defined
Periodic: &config.PeriodicMetricReader{Exporter: config.PushMetricExporter{OTLP: &config.OTLPMetric{}}},
}}
// Setting Metrics.Level to LevelNone disables metrics,
// so the invalid configuration should not cause an error.
cfg.Metrics.Level = configtelemetry.LevelNone
_, _ = newTelemetryProviders(t, telemetry.Settings{}, cfg)

core, observedLogs := observer.New(zapcore.DebugLevel)
logger := zap.New(core)

mp, err := createMeterProvider(t.Context(), telemetry.Settings{}, cfg, logger)
require.NoError(t, err)
assert.NoError(t, mp.Shutdown(t.Context()))

require.Equal(t, 1, observedLogs.Len())
assert.Equal(t, "Internal metrics telemetry disabled", observedLogs.All()[0].Message)
}

// Test that the MeterProvider implements the 'Enabled' functionality.
Expand All @@ -220,8 +242,11 @@ func TestInstrumentEnabled(t *testing.T) {
Pull: &config.PullMetricReader{Exporter: config.PullMetricExporter{Prometheus: prom}},
}}

providers, _ := newTelemetryProviders(t, telemetry.Settings{}, cfg)
meterProvider := providers.MeterProvider()
meterProvider, err := createMeterProvider(t.Context(), telemetry.Settings{}, cfg, zap.NewNop())
require.NoError(t, err)
defer func() {
assert.NoError(t, meterProvider.Shutdown(t.Context()))
}()

meter := meterProvider.Meter("go.opentelemetry.io/collector/service/telemetry")

Expand Down
43 changes: 43 additions & 0 deletions service/telemetry/otelconftelemetry/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otelconftelemetry // import "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"

import (
"context"
"fmt"

"go.opentelemetry.io/otel/attribute"
sdkresource "go.opentelemetry.io/otel/sdk/resource"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/telemetry"
)

func createResource(
_ context.Context,
set telemetry.Settings,
componentConfig component.Config,
) (pcommon.Resource, error) { //nolint:unparam
res := newResource(set, componentConfig.(*Config))
pcommonRes := pcommon.NewResource()
for _, keyValue := range res.Attributes() {
key := string(keyValue.Key)
pcommonRes.Attributes().PutStr(key, mustAttributeValueString(key, keyValue.Value))
}
return pcommonRes, nil
}

func newResource(set telemetry.Settings, cfg *Config) *sdkresource.Resource {
return resource.New(set.BuildInfo, cfg.Resource)
}

func mustAttributeValueString(k string, v attribute.Value) string {
if v.Type() != attribute.STRING {
// We only support string-type resource attributes in the configuration.
panic(fmt.Errorf("attribute %q: expected string, got %s", k, v.Type()))
}
return v.AsString()
}
Loading