Skip to content

Commit ab10fe1

Browse files
jmichalek132dashpoleArthurSensbwplotka
authored
[exporter/prometheusremotewrite] feat: prom rw exporter add support for rw2 (#35888)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Draft PR for adding rw2 support in the prometheus remote write exporter. Very much a draft, not full implementation of the spec with a lot of code duplication and no tests WIP. TODO: - [x] changelog entry - [x] feature flag on top of config options - [x] rip out batching for now - [x] enum instead of bool for RW2 - [x] update exporter readme - [x] Validate supported enum value set - [x] Check for TODOs - [x] unit tests - [x] going over the spec and making changes to be rw2 compliant -> follow up PR - [x] try to reduce duplicate code <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue #33661 Fixes <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Juraj Michalek <[email protected]> Co-authored-by: David Ashpole <[email protected]> Co-authored-by: Arthur Silva Sens <[email protected]> Co-authored-by: Bartlomiej Plotka <[email protected]>
1 parent c2c11e6 commit ab10fe1

File tree

14 files changed

+1018
-71
lines changed

14 files changed

+1018
-71
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: prometheusremotewriteexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add `exporter.prometheusremotewritexporter.enableSendingRW2` feature gate and configuration to the exporter to send Prometheus remote write 2.0 version.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [33661]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: WARNING! PRW 2.0 support for the exporter is still under development and not ready for usage.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user, api]

exporter/prometheusremotewriteexporter/README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ The following settings can be optionally configured:
5151
- *Note the following headers cannot be changed: `Content-Encoding`, `Content-Type`, `X-Prometheus-Remote-Write-Version`, and `User-Agent`.*
5252
- `namespace`: prefix attached to each exported metric name.
5353
- `add_metric_suffixes`: If set to false, type and unit suffixes will not be added to metrics. Default: true.
54-
- `send_metadata`: If set to true, prometheus metadata will be generated and sent. Default: false.
54+
- `send_metadata`: If set to true, prometheus metadata will be generated and sent. Default: false. This option is ignored when using PRW 2.0, which always includes metadata.
5555
- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes.
5656
- `enabled`: enable the sending queue (default: `true`)
5757
- `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false` (default: `10000`)
@@ -64,6 +64,11 @@ The following settings can be optionally configured:
6464
samples to be sent to the remote write endpoint. If the batch size is larger
6565
than this value, it will be split into multiple batches.
6666
- `max_batch_request_parallelism` (default = `5`): Maximum parallelism allowed for a single request bigger than `max_batch_size_bytes`.
67+
- `protobuf_message` (default = `prometheus.WriteRequest`):
68+
- Protobuf message to use when writing to the remote write endpoint. This option is ignored unless the `exporter.prometheusremotewritexporter.enableSendingRW2` feature gate is enabled.
69+
- `prometheus.WriteRequest` is the message used in [Remote Write 1.0](https://prometheus.io/docs/specs/remote_write_spec/).
70+
- `io.prometheus.write.v2.Request` is the message used in [Remote Write 2.0](https://prometheus.io/docs/specs/remote_write_spec_2_0/). It is more efficient, always includes metadata, and adds support for the created timestamp and native histograms. Your remote storage provider must support PRW 2.0 to be able to use this message. PRW 2.0 support is currently **In Development** and is only partially implemented, thus, not ready for usage.
71+
6772

6873
Example:
6974

exporter/prometheusremotewriteexporter/config.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ package prometheusremotewriteexporter // import "github.com/open-telemetry/opent
55

66
import (
77
"errors"
8+
"fmt"
89

10+
"github.com/prometheus/prometheus/config"
911
"go.opentelemetry.io/collector/component"
1012
"go.opentelemetry.io/collector/config/confighttp"
1113
"go.opentelemetry.io/collector/config/configretry"
@@ -50,8 +52,11 @@ type Config struct {
5052
// AddMetricSuffixes controls whether unit and type suffixes are added to metrics on export
5153
AddMetricSuffixes bool `mapstructure:"add_metric_suffixes"`
5254

53-
// SendMetadata controls whether prometheus metadata will be generated and sent
55+
// SendMetadata controls whether prometheus metadata will be generated and sent, this option is ignored when using PRW 2.0, which always includes metadata.
5456
SendMetadata bool `mapstructure:"send_metadata"`
57+
58+
// RemoteWriteProtoMsg controls whether prometheus remote write v1 or v2 is sent.
59+
RemoteWriteProtoMsg config.RemoteWriteProtoMsg `mapstructure:"protobuf_message,omitempty"`
5560
}
5661

5762
type CreatedMetric struct {
@@ -118,5 +123,14 @@ func (cfg *Config) Validate() error {
118123
return errors.New("compression type must be snappy")
119124
}
120125

126+
err := cfg.RemoteWriteProtoMsg.Validate()
127+
if err != nil {
128+
return err
129+
}
130+
131+
if !enableSendingRW2FeatureGate.IsEnabled() && cfg.RemoteWriteProtoMsg == config.RemoteWriteProtoMsgV2 {
132+
return fmt.Errorf("remote write v2 is only supported with the feature gate %s", enableSendingRW2FeatureGate.ID())
133+
}
134+
121135
return nil
122136
}

exporter/prometheusremotewriteexporter/config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/cenkalti/backoff/v4"
12+
"github.com/prometheus/prometheus/config"
1213
"github.com/stretchr/testify/assert"
1314
"github.com/stretchr/testify/require"
1415
"go.opentelemetry.io/collector/component"
@@ -81,6 +82,7 @@ func TestLoadConfig(t *testing.T) {
8182
TargetInfo: &TargetInfo{
8283
Enabled: true,
8384
},
85+
RemoteWriteProtoMsg: config.RemoteWriteProtoMsgV1,
8486
},
8587
},
8688
{

exporter/prometheusremotewriteexporter/exporter.go

Lines changed: 87 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cenkalti/backoff/v4"
1919
"github.com/gogo/protobuf/proto"
2020
"github.com/golang/snappy"
21+
"github.com/prometheus/prometheus/config"
2122
"github.com/prometheus/prometheus/prompb"
2223
"go.opentelemetry.io/collector/component"
2324
"go.opentelemetry.io/collector/config/confighttp"
@@ -80,20 +81,21 @@ var bufferPool = sync.Pool{
8081

8182
// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
8283
type prwExporter struct {
83-
endpointURL *url.URL
84-
client *http.Client
85-
wg *sync.WaitGroup
86-
closeChan chan struct{}
87-
concurrency int
88-
userAgentHeader string
89-
maxBatchSizeBytes int
90-
clientSettings *confighttp.ClientConfig
91-
settings component.TelemetrySettings
92-
retrySettings configretry.BackOffConfig
93-
retryOnHTTP429 bool
94-
wal *prweWAL
95-
exporterSettings prometheusremotewrite.Settings
96-
telemetry prwTelemetry
84+
endpointURL *url.URL
85+
client *http.Client
86+
wg *sync.WaitGroup
87+
closeChan chan struct{}
88+
concurrency int
89+
userAgentHeader string
90+
maxBatchSizeBytes int
91+
clientSettings *confighttp.ClientConfig
92+
settings component.TelemetrySettings
93+
retrySettings configretry.BackOffConfig
94+
retryOnHTTP429 bool
95+
wal *prweWAL
96+
exporterSettings prometheusremotewrite.Settings
97+
telemetry prwTelemetry
98+
RemoteWriteProtoMsg config.RemoteWriteProtoMsg
9799

98100
// When concurrency is enabled, concurrent goroutines would potentially
99101
// fight over the same batchState object. To avoid this, we use a pool
@@ -133,6 +135,10 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
133135
return nil, err
134136
}
135137

138+
if err := config.RemoteWriteProtoMsg.Validate(cfg.RemoteWriteProtoMsg); err != nil {
139+
return nil, err
140+
}
141+
136142
userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version)
137143

138144
concurrency := 5
@@ -147,16 +153,17 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
147153
telemetry.setNumberConsumer(context.Background(), int64(concurrency))
148154

149155
prwe := &prwExporter{
150-
endpointURL: endpointURL,
151-
wg: new(sync.WaitGroup),
152-
closeChan: make(chan struct{}),
153-
userAgentHeader: userAgentHeader,
154-
maxBatchSizeBytes: cfg.MaxBatchSizeBytes,
155-
concurrency: concurrency,
156-
clientSettings: &cfg.ClientConfig,
157-
settings: set.TelemetrySettings,
158-
retrySettings: cfg.BackOffConfig,
159-
retryOnHTTP429: retryOn429FeatureGate.IsEnabled(),
156+
endpointURL: endpointURL,
157+
wg: new(sync.WaitGroup),
158+
closeChan: make(chan struct{}),
159+
userAgentHeader: userAgentHeader,
160+
maxBatchSizeBytes: cfg.MaxBatchSizeBytes,
161+
concurrency: cfg.RemoteWriteQueue.NumConsumers,
162+
clientSettings: &cfg.ClientConfig,
163+
settings: set.TelemetrySettings,
164+
retrySettings: cfg.BackOffConfig,
165+
retryOnHTTP429: retryOn429FeatureGate.IsEnabled(),
166+
RemoteWriteProtoMsg: cfg.RemoteWriteProtoMsg,
160167
exporterSettings: prometheusremotewrite.Settings{
161168
Namespace: cfg.Namespace,
162169
ExternalLabels: sanitizedLabels,
@@ -168,6 +175,8 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
168175
batchStatePool: sync.Pool{New: func() any { return newBatchTimeServicesState() }},
169176
}
170177

178+
prwe.settings.Logger.Info("starting prometheus remote write exporter", zap.Any("ProtoMsg", cfg.RemoteWriteProtoMsg))
179+
171180
prwe.wal = newWAL(cfg.WAL, prwe.export)
172181
return prwe, nil
173182
}
@@ -201,6 +210,23 @@ func (prwe *prwExporter) Shutdown(context.Context) error {
201210
return err
202211
}
203212

213+
func (prwe *prwExporter) pushMetricsV1(ctx context.Context, md pmetric.Metrics) error {
214+
tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
215+
216+
prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))
217+
218+
var m []*prompb.MetricMetadata
219+
if prwe.exporterSettings.SendMetadata {
220+
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
221+
}
222+
if err != nil {
223+
prwe.telemetry.recordTranslationFailure(ctx)
224+
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
225+
}
226+
// Call export even if a conversion error, since there may be points that were successfully converted.
227+
return prwe.handleExport(ctx, tsMap, m)
228+
}
229+
204230
// PushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of
205231
// TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally
206232
// exports the map.
@@ -213,21 +239,21 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
213239
return errors.New("shutdown has been called")
214240
default:
215241

216-
tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
217-
if err != nil {
218-
prwe.telemetry.recordTranslationFailure(ctx)
219-
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
242+
// If feature flag not enabled support only RW1.
243+
if !enableSendingRW2FeatureGate.IsEnabled() {
244+
return prwe.pushMetricsV1(ctx, md)
220245
}
221246

222-
prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))
247+
// If feature flag was enabled check if we want to send RW1 or RW2.
248+
switch prwe.RemoteWriteProtoMsg {
249+
case config.RemoteWriteProtoMsgV1:
250+
return prwe.pushMetricsV1(ctx, md)
251+
case config.RemoteWriteProtoMsgV2:
252+
return prwe.pushMetricsV2(ctx, md)
223253

224-
var m []*prompb.MetricMetadata
225-
if prwe.exporterSettings.SendMetadata {
226-
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
254+
default:
255+
return fmt.Errorf("unsupported remote-write protobuf message: %v", prwe.RemoteWriteProtoMsg)
227256
}
228-
229-
// Call export even if a conversion error, since there may be points that were successfully converted.
230-
return prwe.handleExport(ctx, tsMap, m)
231257
}
232258
}
233259

@@ -298,7 +324,19 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
298324
if !ok {
299325
return
300326
}
301-
if errExecute := prwe.execute(ctx, request); errExecute != nil {
327+
328+
buf := bufferPool.Get().(*buffer)
329+
buf.protobuf.Reset()
330+
defer bufferPool.Put(buf)
331+
332+
if errMarshal := buf.protobuf.Marshal(request); errMarshal != nil {
333+
mu.Lock()
334+
errs = multierr.Append(errs, consumererror.NewPermanent(errMarshal))
335+
mu.Unlock()
336+
return
337+
}
338+
339+
if errExecute := prwe.execute(ctx, buf); errExecute != nil {
302340
mu.Lock()
303341
errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
304342
mu.Unlock()
@@ -312,16 +350,7 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
312350
return errs
313351
}
314352

315-
func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
316-
buf := bufferPool.Get().(*buffer)
317-
buf.protobuf.Reset()
318-
defer bufferPool.Put(buf)
319-
320-
// Uses proto.Marshal to convert the WriteRequest into bytes array
321-
errMarshal := buf.protobuf.Marshal(writeReq)
322-
if errMarshal != nil {
323-
return consumererror.NewPermanent(errMarshal)
324-
}
353+
func (prwe *prwExporter) execute(ctx context.Context, buf *buffer) error {
325354
// If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer.
326355
// Manually grow the buffer to make sure Snappy uses it and we can re-use it afterwards.
327356
maxCompressedLen := snappy.MaxEncodedLen(len(buf.protobuf.Bytes()))
@@ -354,10 +383,20 @@ func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ
354383
// Add necessary headers specified by:
355384
// https://cortexmetrics.io/docs/apis/#remote-api
356385
req.Header.Add("Content-Encoding", "snappy")
357-
req.Header.Set("Content-Type", "application/x-protobuf")
358-
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
359386
req.Header.Set("User-Agent", prwe.userAgentHeader)
360387

388+
switch {
389+
// If feature flag not enabled support only RW1
390+
case !enableSendingRW2FeatureGate.IsEnabled(), prwe.RemoteWriteProtoMsg == config.RemoteWriteProtoMsgV1:
391+
req.Header.Set("Content-Type", "application/x-protobuf")
392+
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
393+
case prwe.RemoteWriteProtoMsg == config.RemoteWriteProtoMsgV2:
394+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
395+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
396+
default:
397+
return fmt.Errorf("unsupported remote-write protobuf message: %v (should be validated earlier)", prwe.RemoteWriteProtoMsg)
398+
}
399+
361400
resp, err := prwe.client.Do(req)
362401
prwe.telemetry.recordRemoteWriteSentBatch(ctx)
363402
if err != nil {

exporter/prometheusremotewriteexporter/exporter_concurrency_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/gogo/protobuf/proto"
1818
"github.com/golang/snappy"
19+
"github.com/prometheus/prometheus/config"
1920
"github.com/prometheus/prometheus/prompb"
2021
"github.com/stretchr/testify/assert"
2122
"github.com/stretchr/testify/require"
@@ -109,7 +110,8 @@ func Test_PushMetricsConcurrent(t *testing.T) {
109110
TargetInfo: &TargetInfo{
110111
Enabled: true,
111112
},
112-
BackOffConfig: retrySettings,
113+
BackOffConfig: retrySettings,
114+
RemoteWriteProtoMsg: config.RemoteWriteProtoMsgV1,
113115
}
114116

115117
assert.NotNil(t, cfg)

0 commit comments

Comments
 (0)