Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions exporter/prometheusremotewriteexporter/exporter_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,13 @@ func (prwe *prwExporter) handleExportV2(ctx context.Context, symbolsTable writev
return nil
}

// TODO implement batching
// TODO how do we handle symbolsTable with batching?
requests := make([]*writev2.Request, 0)
tsArray := make([]writev2.TimeSeries, 0, len(tsMap))
for _, v := range tsMap {
tsArray = append(tsArray, *v)
state := prwe.batchStatePool.Get().(*batchTimeSeriesState)
defer prwe.batchStatePool.Put(state)
requests, err := batchTimeSeriesV2(tsMap, symbolsTable, prwe.maxBatchSizeBytes, state)
if err != nil {
return err
}

requests = append(requests, &writev2.Request{
// Prometheus requires time series to be sorted by Timestamp to avoid out of order problems.
// See:
// * https://github.com/open-telemetry/wg-prometheus/issues/10
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
Timeseries: orderBySampleTimestampV2(tsArray),
Symbols: symbolsTable.Symbols(),
})

// TODO implement WAl support, can be done after #15277 is fixed

return prwe.exportV2(ctx, requests)
Expand Down
11 changes: 0 additions & 11 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sort"

"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
)

type batchTimeSeriesState struct {
Expand Down Expand Up @@ -121,13 +120,3 @@ func orderBySampleTimestamp(tsArray []prompb.TimeSeries) []prompb.TimeSeries {
}
return tsArray
}

func orderBySampleTimestampV2(tsArray []writev2.TimeSeries) []writev2.TimeSeries {
for i := range tsArray {
sL := tsArray[i].Samples
sort.Slice(sL, func(i, j int) bool {
return sL[i].Timestamp < sL[j].Timestamp
})
}
return tsArray
}
110 changes: 0 additions & 110 deletions exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"

"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -249,112 +248,3 @@ func TestEnsureTimeseriesPointsAreSortedByTimestamp(t *testing.T) {
}
}
}

// Ensure that before a prompb.WriteRequest is created, that the points per TimeSeries
// are sorted by Timestamp value, to prevent Prometheus from barfing when it gets poorly
// sorted values. See issues:
// * https://github.com/open-telemetry/wg-prometheus/issues/10
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
func TestEnsureTimeseriesPointsAreSortedByTimestampV2(t *testing.T) {
outOfOrder := []writev2.TimeSeries{
{
Samples: []writev2.Sample{
{
Value: 10.11,
Timestamp: 1000,
},
{
Value: 7.81,
Timestamp: 2,
},
{
Value: 987.81,
Timestamp: 1,
},
{
Value: 18.22,
Timestamp: 999,
},
},
},
{
Samples: []writev2.Sample{
{
Value: 99.91,
Timestamp: 5,
},
{
Value: 4.33,
Timestamp: 3,
},
{
Value: 47.81,
Timestamp: 4,
},
{
Value: 18.22,
Timestamp: 8,
},
},
},
}
got := orderBySampleTimestampV2(outOfOrder)

// We must ensure that the resulting Timeseries' sample points are sorted by Timestamp.
want := []writev2.TimeSeries{
{
Samples: []writev2.Sample{
{
Value: 987.81,
Timestamp: 1,
},
{
Value: 7.81,
Timestamp: 2,
},
{
Value: 18.22,
Timestamp: 999,
},
{
Value: 10.11,
Timestamp: 1000,
},
},
},
{
Samples: []writev2.Sample{
{
Value: 4.33,
Timestamp: 3,
},
{
Value: 47.81,
Timestamp: 4,
},
{
Value: 99.91,
Timestamp: 5,
},
{
Value: 18.22,
Timestamp: 8,
},
},
},
}
assert.Equal(t, want, got)

// For a full sanity/logical check, assert that EVERY
// Sample has a Timestamp bigger than its prior values.
for ti, ts := range got {
for i := range ts.Samples {
si := ts.Samples[i]
for j := 0; j < i; j++ {
sj := ts.Samples[j]
assert.LessOrEqual(t, sj.Timestamp, si.Timestamp, "Timeseries[%d]: Sample[%d].Timestamp(%d) > Sample[%d].Timestamp(%d)",
ti, j, sj.Timestamp, i, si.Timestamp)
}
}
}
}
77 changes: 77 additions & 0 deletions exporter/prometheusremotewriteexporter/helper_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewriteexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"

import (
"errors"
"sort"

writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
)

func batchTimeSeriesV2(tsMap map[string]*writev2.TimeSeries, symbolsTable writev2.SymbolsTable, maxBatchByteSize int, state *batchTimeSeriesState) ([]*writev2.Request, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}

requests := make([]*writev2.Request, 0, max(10, state.nextRequestBufferSize))
tsArray := make([]writev2.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)))

// Calculate symbols table size once since it's shared across batches
symbolsSize := 0
for _, symbol := range symbolsTable.Symbols() {
symbolsSize += len(symbol)
}

sizeOfCurrentBatch := symbolsSize // Initialize with symbols table size
i := 0

for _, v := range tsMap {
sizeOfSeries := v.Size()

if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray))
wrapped := convertTimeseriesToRequestV2(tsArray, symbolsTable)
requests = append(requests, wrapped)

tsArray = make([]writev2.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i))
sizeOfCurrentBatch = symbolsSize // Reset to symbols table size for new batch
}

tsArray = append(tsArray, *v)
sizeOfCurrentBatch += sizeOfSeries
i++
}

if len(tsArray) != 0 {
// TODO only sent necessary part of the symbolsTable
wrapped := convertTimeseriesToRequestV2(tsArray, symbolsTable)
requests = append(requests, wrapped)
}

state.nextRequestBufferSize = 2 * len(requests)
return requests, nil
}

func convertTimeseriesToRequestV2(tsArray []writev2.TimeSeries, symbolsTable writev2.SymbolsTable) *writev2.Request {
return &writev2.Request{
// Prometheus requires time series to be sorted by Timestamp to avoid out of order problems.
// See:
// * https://github.com/open-telemetry/wg-prometheus/issues/10
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
// TODO: try to sort while batching?
Timeseries: orderBySampleTimestampV2(tsArray),
Symbols: symbolsTable.Symbols(),
}
}

func orderBySampleTimestampV2(tsArray []writev2.TimeSeries) []writev2.TimeSeries {
for i := range tsArray {
sL := tsArray[i].Samples
sort.Slice(sL, func(i, j int) bool {
return sL[i].Timestamp < sL[j].Timestamp
})
}
return tsArray
}
Loading