Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit e5e02e3

Browse files
Tweaks for local go bench
Allow loading the whole test data set into memory which can be useful when benchmarking memory allocations. Don't block on sending ingest requests while creating batches which improves data load speed.
1 parent dd8f633 commit e5e02e3

File tree

2 files changed

+66
-23
lines changed

2 files changed

+66
-23
lines changed

pkg/tests/end_to_end_tests/metric_ingest_bench_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ var prometheusDataGzip = "../testdata/prometheus-data.tar.gz"
2121
func TestPromLoader(t *testing.T) {
2222
data, err := extractPrometheusData(prometheusDataGzip, t.TempDir())
2323
require.NoError(t, err, "failed to extract prometheus data")
24-
loader, err := testsupport.NewPromLoader(data)
24+
loader, err := testsupport.NewPromLoader(data, false)
2525
if err != nil {
2626
t.Fatal(err)
2727
}
@@ -48,7 +48,7 @@ func BenchmarkMetricIngest(b *testing.B) {
4848
if err != nil {
4949
b.Fatalf("failed to extract prometheus data: %v", err)
5050
}
51-
loader, err := testsupport.NewPromLoader(data)
51+
loader, err := testsupport.NewPromLoader(data, true) // load whole dataset in memory so we can better track allocations during ingest
5252
require.NoError(b, err)
5353
defer func() {
5454
if err := loader.Close(); err != nil {

pkg/tests/testsupport/metric_loader.go

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ type PromLoader interface {
2121
}
2222

2323
type promLoader struct {
24-
db *tsdb.DBReadOnly
25-
blocks []tsdb.BlockReader
24+
db *tsdb.DBReadOnly
25+
blocks []tsdb.BlockReader
26+
inMemory bool
2627
}
2728

2829
// PromIterator allows us to iterate over Prometheus data
@@ -31,6 +32,27 @@ type PromIterator interface {
3132
Get() TimeSeries
3233
}
3334

35+
type inMemoryIterator struct {
36+
data []TimeSeries
37+
curIdx int
38+
}
39+
40+
func (s *inMemoryIterator) Next() bool {
41+
if s.curIdx == len(s.data)-1 {
42+
return false
43+
}
44+
s.curIdx++
45+
return true
46+
}
47+
48+
func (s *inMemoryIterator) Get() TimeSeries {
49+
return s.data[s.curIdx]
50+
}
51+
52+
func (s *inMemoryIterator) append(ts TimeSeries) {
53+
s.data = append(s.data, ts)
54+
}
55+
3456
type TimeSeries struct {
3557
seriesHash uint64
3658
Val prompb.TimeSeries
@@ -47,7 +69,7 @@ type promIterator struct {
4769
blocks []tsdb.BlockReader
4870
curBlockIdx int
4971
labelsCache map[uint64]labels.Labels
50-
blockSamples []*BlockSample
72+
blockSamples []BlockSample
5173
curSampleIdx int
5274
}
5375

@@ -81,7 +103,7 @@ func (i *promIterator) Next() bool {
81103
func (it *promIterator) loadBlockSamples() error {
82104
log.Info("msg", "loading blocks", "total samples", it.blocks[it.curBlockIdx].Meta().Stats.NumSamples,
83105
"series", it.blocks[it.curBlockIdx].Meta().Stats.NumSeries)
84-
it.blockSamples = make([]*BlockSample, it.blocks[it.curBlockIdx].Meta().Stats.NumSamples)
106+
it.blockSamples = make([]BlockSample, it.blocks[it.curBlockIdx].Meta().Stats.NumSamples)
85107
it.labelsCache = make(map[uint64]labels.Labels, it.blocks[it.curBlockIdx].Meta().Stats.NumSeries)
86108
querier, err := tsdb.NewBlockQuerier(it.blocks[it.curBlockIdx], math.MinInt64, math.MaxInt64)
87109
if err != nil {
@@ -103,8 +125,7 @@ func (it *promIterator) loadBlockSamples() error {
103125
}
104126
for seriesIt.Next() {
105127
ts, val := seriesIt.At()
106-
sample := &BlockSample{ts, val, lblsHash}
107-
it.blockSamples[sampleCounter] = sample
128+
it.blockSamples[sampleCounter] = BlockSample{ts, val, lblsHash}
108129
sampleCounter++
109130
}
110131
}
@@ -130,11 +151,13 @@ func (i *promIterator) Get() TimeSeries {
130151
Labels: protoLabels,
131152
Samples: []prompb.Sample{sample},
132153
}
133-
134154
return TimeSeries{blockSample.lblsHash, ts}
135155
}
136156

137-
func NewPromLoader(dataDir string) (PromLoader, error) {
157+
// PromLoader can preload the whole dataset in memory which can be useful to
158+
// get accurate memory allocations when benchmarking. However it does mean that bench
159+
// test needs more memory to run so make sure that test dataset can fit into memory
160+
func NewPromLoader(dataDir string, inMemory bool) (PromLoader, error) {
138161
db, err := tsdb.OpenDBReadOnly(dataDir, nil)
139162
if err != nil {
140163
return nil, fmt.Errorf("error starting Prometheus TSDB in read-only: %v", err)
@@ -143,11 +166,20 @@ func NewPromLoader(dataDir string) (PromLoader, error) {
143166
if err != nil {
144167
return nil, fmt.Errorf("error loading data blocks: %v", err)
145168
}
146-
return &promLoader{db: db, blocks: blocks}, nil
169+
return &promLoader{db: db, blocks: blocks, inMemory: inMemory}, nil
147170
}
148171

149172
func (loader *promLoader) Iterator() PromIterator {
150-
return &promIterator{blocks: loader.blocks, curSampleIdx: -1, curBlockIdx: -1}
173+
it := &promIterator{blocks: loader.blocks, curSampleIdx: -1, curBlockIdx: -1}
174+
if loader.inMemory {
175+
store := &inMemoryIterator{data: make([]TimeSeries, 0), curIdx: -1}
176+
for it.Next() {
177+
ts := it.Get()
178+
store.append(ts)
179+
}
180+
return store
181+
}
182+
return it
151183
}
152184

153185
func (loader *promLoader) Close() error {
@@ -200,29 +232,40 @@ func (si *sampleIngestor) shardSamples() {
200232
si.shards[shardIdx] <- sample.Val
201233
if si.rate != nil {
202234
if err := si.rate.Wait(context.Background()); err != nil {
203-
log.Error(err)
235+
log.Error("msg", err)
204236
}
205237
}
206238
}
207239
}()
208240
}
209241

210242
func (si *sampleIngestor) ingestSamples(ingest IngestFunc) {
211-
var wg sync.WaitGroup
243+
var shardWg sync.WaitGroup
244+
var ingestWg sync.WaitGroup
245+
reqCh := make(chan prompb.WriteRequest, 100)
212246
for i := 0; i < len(si.shards); i++ {
213-
wg.Add(1)
247+
ingestWg.Add(1)
248+
go func() {
249+
defer func() {
250+
ingestWg.Done()
251+
}()
252+
for req := range reqCh {
253+
if _, _, err := ingest(context.Background(), &req); err != nil {
254+
log.Error("msg", err)
255+
}
256+
}
257+
}()
258+
shardWg.Add(1)
214259
go func(shard int) {
215260
defer func() {
216-
wg.Done()
261+
shardWg.Done()
217262
}()
218263
var req prompb.WriteRequest = prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, si.batchSize)}
219264
counter := 0
220265
for ts := range si.shards[shard] {
221266
if counter == si.batchSize {
222-
if _, _, err := ingest(context.Background(), &req); err != nil {
223-
log.Error(err)
224-
}
225267
req = prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, si.batchSize)}
268+
reqCh <- req
226269
counter = 0
227270
} else {
228271
req.Timeseries[counter] = ts
@@ -231,11 +274,11 @@ func (si *sampleIngestor) ingestSamples(ingest IngestFunc) {
231274
}
232275
if len(req.Timeseries) > 0 {
233276
// flush leftovers
234-
if _, _, err := ingest(context.Background(), &req); err != nil {
235-
log.Error(err)
236-
}
277+
reqCh <- req
237278
}
238279
}(i)
239280
}
240-
wg.Wait()
281+
shardWg.Wait()
282+
close(reqCh)
283+
ingestWg.Wait()
241284
}

0 commit comments

Comments
 (0)