Skip to content

Commit 336677d

Browse files
authored
Support multiple process handling in otelsdk (#2016)
* Support multiple process handling in otelsdk * Add example * Add a changelog entry
1 parent 183ed1a commit 336677d

File tree

5 files changed

+187
-18
lines changed

5 files changed

+187
-18
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http
3232
- Cache offsets for Go `1.24.2`. ([#2081](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/2081))
3333
- Cache offsets for `google.golang.org/grpc` `1.73.0-dev`. ([#2091](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/2091))
3434
- Cache offsets for `golang.org/x/net` `0.39.0`. ([#2107](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/2107))
35+
- The new `Multiplexer` type is added to `go.opentelemetry.io/auto/pipeline/otelsdk`.
36+
This type is used to support multiple process instrumentation using the same telemetry pipeline. ([#2016](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/2016))
3537

3638
### Changed
3739

pipeline/otelsdk/config.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -172,13 +172,17 @@ type config struct {
172172
logger *slog.Logger
173173
exporter sdk.SpanExporter
174174
resAttrs []attribute.KeyValue
175+
176+
spanProcessor sdk.SpanProcessor
177+
idGenerator *idGenerator
175178
}
176179

177180
func newConfig(ctx context.Context, options []Option) (config, error) {
178181
c := config{
179182
resAttrs: []attribute.KeyValue{
180183
semconv.ServiceName(defaultServiceName()),
181184
},
185+
idGenerator: newIDGenerator(),
182186
}
183187

184188
var err error
@@ -188,6 +192,15 @@ func newConfig(ctx context.Context, options []Option) (config, error) {
188192
err = errors.Join(err, e)
189193
}
190194

195+
if c.exporter == nil {
196+
var e error
197+
c.exporter, e = otlptracehttp.New(ctx)
198+
if e != nil {
199+
err = errors.Join(err, e)
200+
}
201+
}
202+
c.spanProcessor = sdk.NewBatchSpanProcessor(c.exporter)
203+
191204
return c, err
192205
}
193206

@@ -206,24 +219,15 @@ func (c config) Logger() *slog.Logger {
206219
return newLogger(nil)
207220
}
208221

209-
func (c config) TracerProvider(ctx context.Context) (*sdk.TracerProvider, error) {
210-
exp := c.exporter
211-
if exp == nil {
212-
var err error
213-
exp, err = otlptracehttp.New(ctx)
214-
if err != nil {
215-
return nil, err
216-
}
217-
}
218-
222+
func (c config) TracerProvider() *sdk.TracerProvider {
219223
return sdk.NewTracerProvider(
220224
// Sample everything. The actual sampling is done in the eBPF probes
221225
// before it reaches this tracerProvider.
222226
sdk.WithSampler(sdk.AlwaysSample()),
223227
sdk.WithResource(c.resource()),
224-
sdk.WithBatcher(exp),
225-
sdk.WithIDGenerator(newIDGenerator()),
226-
), nil
228+
sdk.WithSpanProcessor(c.spanProcessor),
229+
sdk.WithIDGenerator(c.idGenerator),
230+
)
227231
}
228232

229233
func (c config) resource() *resource.Resource {

pipeline/otelsdk/example_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package otelsdk_test
5+
6+
import (
7+
"context"
8+
"os/signal"
9+
"sync"
10+
"syscall"
11+
12+
"go.opentelemetry.io/auto"
13+
"go.opentelemetry.io/auto/pipeline/otelsdk"
14+
)
15+
16+
func Example_multiplex() {
17+
// Create a context that cancels when a SIGTERM is received. This ensures
18+
// that each instrumentation goroutine below can shut down cleanly.
19+
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM)
20+
defer stop()
21+
22+
// Create a new multiplexer to handle instrumentation events from multiple
23+
// sources. This will act as a central router for telemetry handlers.
24+
m, err := otelsdk.NewMultiplexer(ctx)
25+
if err != nil {
26+
panic(err)
27+
}
28+
29+
// Simulated process IDs to be instrumented. These would typically be real
30+
// process IDs in a production scenario.
31+
pids := []int{1297, 1331, 9827}
32+
33+
var wg sync.WaitGroup
34+
for _, pid := range pids {
35+
wg.Add(1)
36+
37+
go func(id int) {
38+
defer wg.Done()
39+
40+
// Create a new instrumentation session for the process.
41+
//
42+
// NOTE: Error handling is omitted here for brevity. In production
43+
// code, always check and handle errors.
44+
inst, _ := auto.NewInstrumentation(
45+
ctx,
46+
auto.WithPID(id),
47+
auto.WithHandler(m.Handler(id)),
48+
)
49+
50+
// Load and start the instrumentation for the process.
51+
_ = inst.Load(ctx)
52+
_ = inst.Run(ctx)
53+
}(pid)
54+
}
55+
56+
// Wait for all instrumentation goroutines to complete.
57+
wg.Wait()
58+
59+
// Shut down the multiplexer, cleaning up any remaining resources.
60+
_ = m.Shutdown(ctx)
61+
}

pipeline/otelsdk/handler.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,11 @@ func NewTraceHandler(ctx context.Context, options ...Option) (*TraceHandler, err
5353
return nil, err
5454
}
5555

56-
tp, err := c.TracerProvider(ctx)
57-
if err != nil {
58-
return nil, err
59-
}
56+
return newTraceHandler(c), nil
57+
}
6058

61-
return &TraceHandler{logger: c.Logger(), tracerProvider: tp}, nil
59+
func newTraceHandler(c config) *TraceHandler {
60+
return &TraceHandler{logger: c.Logger(), tracerProvider: c.TracerProvider()}
6261
}
6362

6463
// HandleTrace the passed telemetry using the default OpenTelemetry Go SDK.

pipeline/otelsdk/multiplex.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package otelsdk
5+
6+
import (
7+
"context"
8+
"debug/buildinfo"
9+
"strconv"
10+
11+
"go.opentelemetry.io/otel/attribute"
12+
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
13+
14+
"go.opentelemetry.io/auto/pipeline"
15+
)
16+
17+
// Multiplexer supports sending telemetry from multiple resources through the
18+
// same processing and exporting pipeline.
19+
type Multiplexer struct {
20+
cfg config
21+
}
22+
23+
// NewMultiplexer returns a new *Multiplexer that reuses the provided options as
24+
// a base configuration for all [pipeline.Handler] it creates.
25+
//
26+
// Any exporter provided in the options must support exporting telemetry for
27+
// multiple resources.
28+
func NewMultiplexer(ctx context.Context, options ...Option) (*Multiplexer, error) {
29+
cfg, err := newConfig(ctx, options)
30+
if err != nil {
31+
return nil, err
32+
}
33+
return &Multiplexer{cfg: cfg}, nil
34+
}
35+
36+
// Handler returns a new [pipeline.Handler] configured with additional
37+
// resource attributes that follow OpenTelemetry semantic conventions for the
38+
// process associated with the given pid.
39+
//
40+
// If an error occurs while determining process-specific resource attributes,
41+
// the error is logged, and a handler without those attributes is returned.
42+
//
43+
// If Shutdown has already been called on the Multiplexer, the returned handler
44+
// will also be in a shut down state and will not export any telemetry.
45+
func (m Multiplexer) Handler(pid int) *pipeline.Handler {
46+
c := m.withProcResAttrs(pid)
47+
return &pipeline.Handler{TraceHandler: newTraceHandler(c)}
48+
}
49+
50+
// Shutdown gracefully shuts down the Multiplexer's span processor.
51+
//
52+
// After Shutdown is called, any subsequent calls to Handler will return a
53+
// handler that is in a shut down state. These handlers will silently drop
54+
// telemetry and will not perform any processing or exporting.
55+
func (m Multiplexer) Shutdown(ctx context.Context) error {
56+
return m.cfg.spanProcessor.Shutdown(ctx)
57+
}
58+
59+
// withProcResAttrs returns a copy of the Multiplexer's config with additional
60+
// resource attributes based on Go runtime build information for the process
61+
// identified by pid.
62+
//
63+
// It attempts to read the build info of the Go executable at /proc/<pid>/exe
64+
// and extracts semantic convention attributes such as the runtime version and
65+
// compiler name. If this fails, an error is logged and no extra attributes
66+
// are added.
67+
func (m Multiplexer) withProcResAttrs(pid int) (c config) {
68+
c = m.cfg // Make a shallow copy to modify attributes.
69+
70+
var attrs []attribute.KeyValue
71+
72+
path := "/proc/" + strconv.Itoa(pid) + "/exe"
73+
bi, err := buildinfo.ReadFile(path)
74+
if err != nil {
75+
c.logger.Error("failed to get Go proc build info", "error", err)
76+
return c
77+
}
78+
79+
// Add Go runtime version as a semantic attribute.
80+
attrs = append(attrs, semconv.ProcessRuntimeVersion(bi.GoVersion))
81+
82+
// Try to determine which Go compiler was used.
83+
var compiler string
84+
for _, setting := range bi.Settings {
85+
if setting.Key == "-compiler" {
86+
compiler = setting.Value
87+
break
88+
}
89+
}
90+
switch compiler {
91+
case "":
92+
c.logger.Debug("failed to identify Go compiler")
93+
case "gc":
94+
attrs = append(attrs, semconv.ProcessRuntimeName("go"))
95+
default:
96+
attrs = append(attrs, semconv.ProcessRuntimeName(compiler))
97+
}
98+
99+
// Prepend process-specific attributes so user-provided ones have priority.
100+
c.resAttrs = append(attrs, c.resAttrs...)
101+
102+
return c
103+
}

0 commit comments

Comments
 (0)