Skip to content

Commit 450d195

Browse files
jmichalek132ywwg
authored andcommitted
[chore] prom rw v2 exporter add support for batching (open-telemetry#40051)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Implementing batching support in RW2, in this sort of naive implementation we sent the full symbolsTable with each separate request, batching including splitting up the symbolsTable should be doable if we push down the logic for into the translator package, however it's not easy to implement batching based on bytes size instead of number of samples, so I wonder if for RW2 we could switch to batching based on number of samples similar to what prometheus does, WDYT @ArthurSens @dashpole @ywwg ? <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Partially implements open-telemetry#33661 (when merging PR please don't close the tracing issue) --------- Co-authored-by: Owen Williams <[email protected]>
1 parent 033d3d4 commit 450d195

File tree

6 files changed

+341
-136
lines changed

6 files changed

+341
-136
lines changed

exporter/prometheusremotewriteexporter/exporter_v2.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,23 +93,13 @@ func (prwe *prwExporter) handleExportV2(ctx context.Context, symbolsTable writev
9393
return nil
9494
}
9595

96-
// TODO implement batching
97-
// TODO how do we handle symbolsTable with batching?
98-
requests := make([]*writev2.Request, 0)
99-
tsArray := make([]writev2.TimeSeries, 0, len(tsMap))
100-
for _, v := range tsMap {
101-
tsArray = append(tsArray, *v)
96+
state := prwe.batchStatePool.Get().(*batchTimeSeriesState)
97+
defer prwe.batchStatePool.Put(state)
98+
requests, err := batchTimeSeriesV2(tsMap, symbolsTable, prwe.maxBatchSizeBytes, state)
99+
if err != nil {
100+
return err
102101
}
103102

104-
requests = append(requests, &writev2.Request{
105-
// Prometheus requires time series to be sorted by Timestamp to avoid out of order problems.
106-
// See:
107-
// * https://github.com/open-telemetry/wg-prometheus/issues/10
108-
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
109-
Timeseries: orderBySampleTimestampV2(tsArray),
110-
Symbols: symbolsTable.Symbols(),
111-
})
112-
113103
// TODO implement WAl support, can be done after #15277 is fixed
114104

115105
return prwe.exportV2(ctx, requests)

exporter/prometheusremotewriteexporter/helper.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"sort"
1010

1111
"github.com/prometheus/prometheus/prompb"
12-
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
1312
)
1413

1514
type batchTimeSeriesState struct {
@@ -121,13 +120,3 @@ func orderBySampleTimestamp(tsArray []prompb.TimeSeries) []prompb.TimeSeries {
121120
}
122121
return tsArray
123122
}
124-
125-
func orderBySampleTimestampV2(tsArray []writev2.TimeSeries) []writev2.TimeSeries {
126-
for i := range tsArray {
127-
sL := tsArray[i].Samples
128-
sort.Slice(sL, func(i, j int) bool {
129-
return sL[i].Timestamp < sL[j].Timestamp
130-
})
131-
}
132-
return tsArray
133-
}

exporter/prometheusremotewriteexporter/helper_test.go

Lines changed: 0 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"testing"
99

1010
"github.com/prometheus/prometheus/prompb"
11-
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
1211
"github.com/stretchr/testify/assert"
1312
)
1413

@@ -249,112 +248,3 @@ func TestEnsureTimeseriesPointsAreSortedByTimestamp(t *testing.T) {
249248
}
250249
}
251250
}
252-
253-
// Ensure that before a prompb.WriteRequest is created, that the points per TimeSeries
254-
// are sorted by Timestamp value, to prevent Prometheus from barfing when it gets poorly
255-
// sorted values. See issues:
256-
// * https://github.com/open-telemetry/wg-prometheus/issues/10
257-
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
258-
func TestEnsureTimeseriesPointsAreSortedByTimestampV2(t *testing.T) {
259-
outOfOrder := []writev2.TimeSeries{
260-
{
261-
Samples: []writev2.Sample{
262-
{
263-
Value: 10.11,
264-
Timestamp: 1000,
265-
},
266-
{
267-
Value: 7.81,
268-
Timestamp: 2,
269-
},
270-
{
271-
Value: 987.81,
272-
Timestamp: 1,
273-
},
274-
{
275-
Value: 18.22,
276-
Timestamp: 999,
277-
},
278-
},
279-
},
280-
{
281-
Samples: []writev2.Sample{
282-
{
283-
Value: 99.91,
284-
Timestamp: 5,
285-
},
286-
{
287-
Value: 4.33,
288-
Timestamp: 3,
289-
},
290-
{
291-
Value: 47.81,
292-
Timestamp: 4,
293-
},
294-
{
295-
Value: 18.22,
296-
Timestamp: 8,
297-
},
298-
},
299-
},
300-
}
301-
got := orderBySampleTimestampV2(outOfOrder)
302-
303-
// We must ensure that the resulting Timeseries' sample points are sorted by Timestamp.
304-
want := []writev2.TimeSeries{
305-
{
306-
Samples: []writev2.Sample{
307-
{
308-
Value: 987.81,
309-
Timestamp: 1,
310-
},
311-
{
312-
Value: 7.81,
313-
Timestamp: 2,
314-
},
315-
{
316-
Value: 18.22,
317-
Timestamp: 999,
318-
},
319-
{
320-
Value: 10.11,
321-
Timestamp: 1000,
322-
},
323-
},
324-
},
325-
{
326-
Samples: []writev2.Sample{
327-
{
328-
Value: 4.33,
329-
Timestamp: 3,
330-
},
331-
{
332-
Value: 47.81,
333-
Timestamp: 4,
334-
},
335-
{
336-
Value: 99.91,
337-
Timestamp: 5,
338-
},
339-
{
340-
Value: 18.22,
341-
Timestamp: 8,
342-
},
343-
},
344-
},
345-
}
346-
assert.Equal(t, want, got)
347-
348-
// For a full sanity/logical check, assert that EVERY
349-
// Sample has a Timestamp bigger than its prior values.
350-
for ti, ts := range got {
351-
for i := range ts.Samples {
352-
si := ts.Samples[i]
353-
for j := 0; j < i; j++ {
354-
sj := ts.Samples[j]
355-
assert.LessOrEqual(t, sj.Timestamp, si.Timestamp, "Timeseries[%d]: Sample[%d].Timestamp(%d) > Sample[%d].Timestamp(%d)",
356-
ti, j, sj.Timestamp, i, si.Timestamp)
357-
}
358-
}
359-
}
360-
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package prometheusremotewriteexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"
5+
6+
import (
7+
"errors"
8+
"sort"
9+
10+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
11+
)
12+
13+
func batchTimeSeriesV2(tsMap map[string]*writev2.TimeSeries, symbolsTable writev2.SymbolsTable, maxBatchByteSize int, state *batchTimeSeriesState) ([]*writev2.Request, error) {
14+
if len(tsMap) == 0 {
15+
return nil, errors.New("invalid tsMap: cannot be empty map")
16+
}
17+
18+
requests := make([]*writev2.Request, 0, max(10, state.nextRequestBufferSize))
19+
tsArray := make([]writev2.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)))
20+
21+
// Calculate symbols table size once since it's shared across batches
22+
symbolsSize := 0
23+
for _, symbol := range symbolsTable.Symbols() {
24+
symbolsSize += len(symbol)
25+
}
26+
27+
sizeOfCurrentBatch := symbolsSize // Initialize with symbols table size
28+
i := 0
29+
30+
for _, v := range tsMap {
31+
sizeOfSeries := v.Size()
32+
33+
if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
34+
state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray))
35+
wrapped := convertTimeseriesToRequestV2(tsArray, symbolsTable)
36+
requests = append(requests, wrapped)
37+
38+
tsArray = make([]writev2.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i))
39+
sizeOfCurrentBatch = symbolsSize // Reset to symbols table size for new batch
40+
}
41+
42+
tsArray = append(tsArray, *v)
43+
sizeOfCurrentBatch += sizeOfSeries
44+
i++
45+
}
46+
47+
if len(tsArray) != 0 {
48+
// TODO only sent necessary part of the symbolsTable
49+
wrapped := convertTimeseriesToRequestV2(tsArray, symbolsTable)
50+
requests = append(requests, wrapped)
51+
}
52+
53+
state.nextRequestBufferSize = 2 * len(requests)
54+
return requests, nil
55+
}
56+
57+
func convertTimeseriesToRequestV2(tsArray []writev2.TimeSeries, symbolsTable writev2.SymbolsTable) *writev2.Request {
58+
return &writev2.Request{
59+
// Prometheus requires time series to be sorted by Timestamp to avoid out of order problems.
60+
// See:
61+
// * https://github.com/open-telemetry/wg-prometheus/issues/10
62+
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
63+
// TODO: try to sort while batching?
64+
Timeseries: orderBySampleTimestampV2(tsArray),
65+
Symbols: symbolsTable.Symbols(),
66+
}
67+
}
68+
69+
func orderBySampleTimestampV2(tsArray []writev2.TimeSeries) []writev2.TimeSeries {
70+
for i := range tsArray {
71+
sL := tsArray[i].Samples
72+
sort.Slice(sL, func(i, j int) bool {
73+
return sL[i].Timestamp < sL[j].Timestamp
74+
})
75+
}
76+
return tsArray
77+
}

0 commit comments

Comments
 (0)