Skip to content

Commit 93e2ec4

Browse files
authored
Merge branch 'main' into 1618-ForceFlushAborts
2 parents 8bc967d + 3947cab commit 93e2ec4

File tree

14 files changed

+1288
-155
lines changed

14 files changed

+1288
-155
lines changed

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,26 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
88

99
## [Unreleased]
1010

11+
### Added
12+
13+
- Added support for configuring OTLP/HTTP Endpoints, Headers, Compression and Timeout via the Environment Variables. (#1758)
14+
- `OTEL_EXPORTER_OTLP_ENDPOINT`
15+
- `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`
16+
- `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`
17+
- `OTEL_EXPORTER_OTLP_HEADERS`
18+
- `OTEL_EXPORTER_OTLP_TRACES_HEADERS`
19+
- `OTEL_EXPORTER_OTLP_METRICS_HEADERS`
20+
- `OTEL_EXPORTER_OTLP_COMPRESSION`
21+
- `OTEL_EXPORTER_OTLP_TRACES_COMPRESSION`
22+
- `OTEL_EXPORTER_OTLP_METRICS_COMPRESSION`
23+
- `OTEL_EXPORTER_OTLP_TIMEOUT`
24+
- `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`
25+
- `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT`
1126
### Fixed
1227

1328
- The `Span.IsRecording` implementation from `go.opentelemetry.io/otel/sdk/trace` always returns false when not being sampled. (#1750)
29+
- The Jaeger exporter now correctly sets tags for the Span status code and message.
30+
This means it uses the correct tag keys (`"otel.status_code"`, `"otel.status_description"`) and does not set the status message as a tag unless it is set on the span. (#1761)
1431

1532
### Changed
1633

exporters/otlp/otlphttp/driver.go

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"math/rand"
2525
"net"
2626
"net/http"
27+
"os"
2728
"path"
2829
"strings"
2930
"time"
@@ -62,30 +63,33 @@ var ourTransport *http.Transport = &http.Transport{
6263
}
6364

6465
type driver struct {
65-
client *http.Client
66-
cfg config
66+
metricsDriver signalDriver
67+
tracesDriver signalDriver
68+
cfg config
6769

6870
stopCh chan struct{}
6971
}
7072

73+
type signalDriver struct {
74+
cfg signalConfig
75+
generalCfg config
76+
client *http.Client
77+
stopCh chan struct{}
78+
}
79+
7180
var _ otlp.ProtocolDriver = (*driver)(nil)
7281

7382
// NewDriver creates a new HTTP driver.
7483
func NewDriver(opts ...Option) otlp.ProtocolDriver {
75-
cfg := config{
76-
endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
77-
compression: NoCompression,
78-
tracesURLPath: DefaultTracesPath,
79-
metricsURLPath: DefaultMetricsPath,
80-
maxAttempts: DefaultMaxAttempts,
81-
backoff: DefaultBackoff,
82-
}
84+
cfg := newDefaultConfig()
85+
applyEnvConfigs(&cfg, os.Getenv)
86+
8387
for _, opt := range opts {
8488
opt.Apply(&cfg)
8589
}
8690
for pathPtr, defaultPath := range map[*string]string{
87-
&cfg.tracesURLPath: DefaultTracesPath,
88-
&cfg.metricsURLPath: DefaultMetricsPath,
91+
&cfg.traces.urlPath: DefaultTracesPath,
92+
&cfg.metrics.urlPath: DefaultMetricsPath,
8993
} {
9094
tmp := strings.TrimSpace(*pathPtr)
9195
if tmp == "" {
@@ -107,18 +111,43 @@ func NewDriver(opts ...Option) otlp.ProtocolDriver {
107111
if cfg.backoff <= 0 {
108112
cfg.backoff = DefaultBackoff
109113
}
110-
client := &http.Client{
114+
115+
metricsClient := &http.Client{
116+
Transport: ourTransport,
117+
Timeout: cfg.metrics.timeout,
118+
}
119+
if cfg.metrics.tlsCfg != nil {
120+
transport := ourTransport.Clone()
121+
transport.TLSClientConfig = cfg.metrics.tlsCfg
122+
metricsClient.Transport = transport
123+
}
124+
125+
tracesClient := &http.Client{
111126
Transport: ourTransport,
127+
Timeout: cfg.traces.timeout,
112128
}
113-
if cfg.tlsCfg != nil {
129+
if cfg.traces.tlsCfg != nil {
114130
transport := ourTransport.Clone()
115-
transport.TLSClientConfig = cfg.tlsCfg
116-
client.Transport = transport
131+
transport.TLSClientConfig = cfg.traces.tlsCfg
132+
tracesClient.Transport = transport
117133
}
134+
135+
stopCh := make(chan struct{})
118136
return &driver{
119-
client: client,
137+
tracesDriver: signalDriver{
138+
cfg: cfg.traces,
139+
generalCfg: cfg,
140+
stopCh: stopCh,
141+
client: tracesClient,
142+
},
143+
metricsDriver: signalDriver{
144+
cfg: cfg.metrics,
145+
generalCfg: cfg,
146+
stopCh: stopCh,
147+
client: metricsClient,
148+
},
120149
cfg: cfg,
121-
stopCh: make(chan struct{}),
150+
stopCh: stopCh,
122151
}
123152
}
124153

@@ -150,7 +179,7 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet,
150179
if err != nil {
151180
return err
152181
}
153-
return d.send(ctx, rawRequest, d.cfg.metricsURLPath)
182+
return d.metricsDriver.send(ctx, rawRequest)
154183
}
155184

156185
// ExportTraces implements otlp.ProtocolDriver.
@@ -166,7 +195,7 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
166195
if err != nil {
167196
return err
168197
}
169-
return d.send(ctx, rawRequest, d.cfg.tracesURLPath)
198+
return d.tracesDriver.send(ctx, rawRequest)
170199
}
171200

172201
func (d *driver) marshal(msg proto.Message) ([]byte, error) {
@@ -176,12 +205,12 @@ func (d *driver) marshal(msg proto.Message) ([]byte, error) {
176205
return proto.Marshal(msg)
177206
}
178207

179-
func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) error {
180-
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, urlPath)
208+
func (d *signalDriver) send(ctx context.Context, rawRequest []byte) error {
209+
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, d.cfg.urlPath)
181210
var cancel context.CancelFunc
182211
ctx, cancel = d.contextWithStop(ctx)
183212
defer cancel()
184-
for i := 0; i < d.cfg.maxAttempts; i++ {
213+
for i := 0; i < d.generalCfg.maxAttempts; i++ {
185214
response, err := d.singleSend(ctx, rawRequest, address)
186215
if err != nil {
187216
return err
@@ -198,7 +227,7 @@ func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) er
198227
fallthrough
199228
case http.StatusServiceUnavailable:
200229
select {
201-
case <-time.After(getWaitDuration(d.cfg.backoff, i)):
230+
case <-time.After(getWaitDuration(d.generalCfg.backoff, i)):
202231
continue
203232
case <-ctx.Done():
204233
return ctx.Err()
@@ -207,10 +236,10 @@ func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) er
207236
return fmt.Errorf("failed with HTTP status %s", response.Status)
208237
}
209238
}
210-
return fmt.Errorf("failed to send data to %s after %d tries", address, d.cfg.maxAttempts)
239+
return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.maxAttempts)
211240
}
212241

213-
func (d *driver) getScheme() string {
242+
func (d *signalDriver) getScheme() string {
214243
if d.cfg.insecure {
215244
return "http"
216245
}
@@ -237,7 +266,7 @@ func getWaitDuration(backoff time.Duration, i int) time.Duration {
237266
return (time.Duration)(k)*backoff + (time.Duration)(jitter)
238267
}
239268

240-
func (d *driver) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
269+
func (d *signalDriver) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
241270
// Unify the parent context Done signal with the driver's stop
242271
// channel.
243272
ctx, cancel := context.WithCancel(ctx)
@@ -253,7 +282,7 @@ func (d *driver) contextWithStop(ctx context.Context) (context.Context, context.
253282
return ctx, cancel
254283
}
255284

256-
func (d *driver) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) {
285+
func (d *signalDriver) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) {
257286
request, err := http.NewRequestWithContext(ctx, http.MethodPost, address, nil)
258287
if err != nil {
259288
return nil, err
@@ -271,14 +300,14 @@ func (d *driver) singleSend(ctx context.Context, rawRequest []byte, address stri
271300
return d.client.Do(request)
272301
}
273302

274-
func (d *driver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) {
303+
func (d *signalDriver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) {
275304
var bodyReader io.ReadCloser
276305
headers := http.Header{}
277306
for k, v := range d.cfg.headers {
278307
headers.Set(k, v)
279308
}
280309
contentLength := (int64)(len(rawRequest))
281-
if d.cfg.marshaler == MarshalJSON {
310+
if d.generalCfg.marshaler == MarshalJSON {
282311
headers.Set("Content-Type", contentTypeJSON)
283312
} else {
284313
headers.Set("Content-Type", contentTypeProto)

exporters/otlp/otlphttp/driver_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package otlphttp_test
1717
import (
1818
"context"
1919
"net/http"
20+
"os"
2021
"testing"
2122
"time"
2223

@@ -167,6 +168,27 @@ func TestRetry(t *testing.T) {
167168
assert.Len(t, mc.GetSpans(), 1)
168169
}
169170

171+
func TestTimeout(t *testing.T) {
172+
mcCfg := mockCollectorConfig{
173+
InjectDelay: 100 * time.Millisecond,
174+
}
175+
mc := runMockCollector(t, mcCfg)
176+
defer mc.MustStop(t)
177+
driver := otlphttp.NewDriver(
178+
otlphttp.WithEndpoint(mc.Endpoint()),
179+
otlphttp.WithInsecure(),
180+
otlphttp.WithTimeout(50*time.Millisecond),
181+
)
182+
ctx := context.Background()
183+
exporter, err := otlp.NewExporter(ctx, driver)
184+
require.NoError(t, err)
185+
defer func() {
186+
assert.NoError(t, exporter.Shutdown(ctx))
187+
}()
188+
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
189+
assert.Equal(t, true, os.IsTimeout(err))
190+
}
191+
170192
func TestRetryFailed(t *testing.T) {
171193
statuses := []int{
172194
http.StatusTooManyRequests,
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package otlphttp
16+
17+
import (
18+
"fmt"
19+
"net/url"
20+
"strconv"
21+
"strings"
22+
"time"
23+
)
24+
25+
func applyEnvConfigs(cfg *config, getEnv func(string) string) *config {
26+
opts := getOptionsFromEnv(getEnv)
27+
for _, opt := range opts {
28+
opt.Apply(cfg)
29+
}
30+
return cfg
31+
}
32+
33+
func getOptionsFromEnv(env func(string) string) []Option {
34+
var opts []Option
35+
36+
// Endpoint
37+
if v, ok := getEnv(env, "ENDPOINT"); ok {
38+
opts = append(opts, WithEndpoint(v))
39+
}
40+
if v, ok := getEnv(env, "TRACES_ENDPOINT"); ok {
41+
opts = append(opts, WithTracesEndpoint(v))
42+
}
43+
if v, ok := getEnv(env, "METRICS_ENDPOINT"); ok {
44+
opts = append(opts, WithMetricsEndpoint(v))
45+
}
46+
47+
// Certificate File
48+
// TODO: add certificate file env config support
49+
50+
// Headers
51+
if h, ok := getEnv(env, "HEADERS"); ok {
52+
opts = append(opts, WithHeaders(stringToHeader(h)))
53+
}
54+
if h, ok := getEnv(env, "TRACES_HEADERS"); ok {
55+
opts = append(opts, WithTracesHeaders(stringToHeader(h)))
56+
}
57+
if h, ok := getEnv(env, "METRICS_HEADERS"); ok {
58+
opts = append(opts, WithMetricsHeaders(stringToHeader(h)))
59+
}
60+
61+
// Compression
62+
if c, ok := getEnv(env, "COMPRESSION"); ok {
63+
opts = append(opts, WithCompression(stringToCompression(c)))
64+
}
65+
if c, ok := getEnv(env, "TRACES_COMPRESSION"); ok {
66+
opts = append(opts, WithTracesCompression(stringToCompression(c)))
67+
}
68+
if c, ok := getEnv(env, "METRICS_COMPRESSION"); ok {
69+
opts = append(opts, WithMetricsCompression(stringToCompression(c)))
70+
}
71+
72+
// Timeout
73+
if t, ok := getEnv(env, "TIMEOUT"); ok {
74+
if d, err := strconv.Atoi(t); err == nil {
75+
opts = append(opts, WithTimeout(time.Duration(d)*time.Millisecond))
76+
}
77+
}
78+
if t, ok := getEnv(env, "TRACES_TIMEOUT"); ok {
79+
if d, err := strconv.Atoi(t); err == nil {
80+
opts = append(opts, WithTracesTimeout(time.Duration(d)*time.Millisecond))
81+
}
82+
}
83+
if t, ok := getEnv(env, "METRICS_TIMEOUT"); ok {
84+
if d, err := strconv.Atoi(t); err == nil {
85+
opts = append(opts, WithMetricsTimeout(time.Duration(d)*time.Millisecond))
86+
}
87+
}
88+
89+
return opts
90+
}
91+
92+
// getEnv gets an OTLP environment variable value of the specified key using the env function.
93+
// This function already prepends the OTLP prefix to all key lookup.
94+
func getEnv(env func(string) string, key string) (string, bool) {
95+
v := strings.TrimSpace(env(fmt.Sprintf("OTEL_EXPORTER_OTLP_%s", key)))
96+
return v, v != ""
97+
}
98+
99+
func stringToCompression(value string) Compression {
100+
switch value {
101+
case "gzip":
102+
return GzipCompression
103+
}
104+
105+
return NoCompression
106+
}
107+
108+
func stringToHeader(value string) map[string]string {
109+
headersPairs := strings.Split(value, ",")
110+
headers := make(map[string]string)
111+
112+
for _, header := range headersPairs {
113+
nameValue := strings.SplitN(header, "=", 2)
114+
if len(nameValue) < 2 {
115+
continue
116+
}
117+
name, err := url.QueryUnescape(nameValue[0])
118+
if err != nil {
119+
continue
120+
}
121+
trimmedName := strings.TrimSpace(name)
122+
value, err := url.QueryUnescape(nameValue[1])
123+
if err != nil {
124+
continue
125+
}
126+
trimmedValue := strings.TrimSpace(value)
127+
128+
headers[trimmedName] = trimmedValue
129+
}
130+
131+
return headers
132+
}

0 commit comments

Comments
 (0)