Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 3eaf7ac

Browse files
committed
actually use the WriteRequest pool we have
We were adding things to the pool, but never removing from it. Actually using the pool gives a 10% speedup. To do this we nil the correct Samples to prevent them from being clobbered; the old code would clobber a copy which could cause data corruption.
1 parent 1aa42ea commit 3eaf7ac

File tree

8 files changed

+38
-25
lines changed

8 files changed

+38
-25
lines changed

pkg/api/write.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@ package api
22

33
import (
44
"fmt"
5+
"io/ioutil"
6+
"net/http"
7+
"sync/atomic"
8+
"time"
9+
510
"github.com/gogo/protobuf/proto"
611
"github.com/golang/snappy"
712
"github.com/prometheus/client_golang/prometheus"
813
io_prometheus_client "github.com/prometheus/client_model/go"
914
"github.com/timescale/timescale-prometheus/pkg/log"
1015
"github.com/timescale/timescale-prometheus/pkg/pgmodel"
11-
"github.com/timescale/timescale-prometheus/pkg/prompb"
1216
"github.com/timescale/timescale-prometheus/pkg/util"
13-
"io/ioutil"
14-
"net/http"
15-
"sync/atomic"
16-
"time"
1717
)
1818

1919
func Write(writer pgmodel.DBInserter, elector *util.Elector, metrics *Metrics) http.Handler {
@@ -48,8 +48,8 @@ func Write(writer pgmodel.DBInserter, elector *util.Elector, metrics *Metrics) h
4848
return
4949
}
5050

51-
var req prompb.WriteRequest
52-
if err := proto.Unmarshal(reqBuf, &req); err != nil {
51+
req := pgmodel.NewWriteRequest()
52+
if err := proto.Unmarshal(reqBuf, req); err != nil {
5353
log.Error("msg", "Unmarshal error", "err", err.Error())
5454
http.Error(w, err.Error(), http.StatusBadRequest)
5555
return
@@ -65,7 +65,7 @@ func Write(writer pgmodel.DBInserter, elector *util.Elector, metrics *Metrics) h
6565
metrics.ReceivedSamples.Add(float64(receivedBatchCount))
6666
begin := time.Now()
6767

68-
numSamples, err := writer.Ingest(req.GetTimeseries(), &req)
68+
numSamples, err := writer.Ingest(req.GetTimeseries(), req)
6969
if err != nil {
7070
log.Warn("msg", "Error sending samples to remote storage", "err", err, "num_samples", numSamples)
7171
http.Error(w, err.Error(), http.StatusInternalServerError)

pkg/pgmodel/end_to_end_tests/concurrent_sql_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ func testConcurrentInsertSimple(t testing.TB, db *pgxpool.Pool, metric string) {
225225
t.Fatal(err)
226226
}
227227
defer ingestor.Close()
228-
_, err = ingestor.Ingest(metrics, NewWriteRequest())
228+
_, err = ingestor.Ingest(copyMetrics(metrics), NewWriteRequest())
229229
if err != nil {
230230
t.Fatal(err)
231231
}
@@ -285,7 +285,7 @@ func testConcurrentInsertAdvanced(t testing.TB, db *pgxpool.Pool) {
285285
}
286286

287287
defer ingestor.Close()
288-
_, err = ingestor.Ingest(metrics, NewWriteRequest())
288+
_, err = ingestor.Ingest(copyMetrics(metrics), NewWriteRequest())
289289
if err != nil {
290290
t.Fatal(err)
291291
}

pkg/pgmodel/end_to_end_tests/create_test.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ func TestSQLChunkInterval(t *testing.T) {
160160
t.Fatal(err)
161161
}
162162
defer ingestor.Close()
163-
_, err = ingestor.Ingest(ts, NewWriteRequest())
163+
_, err = ingestor.Ingest(copyMetrics(ts), NewWriteRequest())
164164
if err != nil {
165165
t.Fatal(err)
166166
}
@@ -401,7 +401,7 @@ func TestSQLIngest(t *testing.T) {
401401
}
402402
defer ingestor.Close()
403403

404-
cnt, err := ingestor.Ingest(tcase.metrics, NewWriteRequest())
404+
cnt, err := ingestor.Ingest(copyMetrics(tcase.metrics), NewWriteRequest())
405405
if err != nil && err != tcase.expectErr {
406406
t.Fatalf("got an unexpected error %v", err)
407407
}
@@ -479,7 +479,7 @@ func TestInsertCompressedDuplicates(t *testing.T) {
479479
t.Fatal(err)
480480
}
481481
defer ingestor.Close()
482-
_, err = ingestor.Ingest(ts, NewWriteRequest())
482+
_, err = ingestor.Ingest(copyMetrics(ts), NewWriteRequest())
483483
if err != nil {
484484
t.Fatal(err)
485485
}
@@ -510,7 +510,7 @@ func TestInsertCompressedDuplicates(t *testing.T) {
510510
},
511511
}
512512

513-
_, err = ingestor.Ingest(ts, NewWriteRequest())
513+
_, err = ingestor.Ingest(copyMetrics(ts), NewWriteRequest())
514514
if err != nil {
515515
t.Fatal(err)
516516
}
@@ -535,7 +535,7 @@ func TestInsertCompressedDuplicates(t *testing.T) {
535535
}
536536

537537
//ingest duplicate after compression
538-
_, err = ingestor.Ingest(ts, NewWriteRequest())
538+
_, err = ingestor.Ingest(copyMetrics(ts), NewWriteRequest())
539539
if err != nil {
540540
t.Fatal(err)
541541
}
@@ -582,7 +582,7 @@ func TestInsertCompressed(t *testing.T) {
582582
t.Fatal(err)
583583
}
584584
defer ingestor.Close()
585-
_, err = ingestor.Ingest(ts, NewWriteRequest())
585+
_, err = ingestor.Ingest(copyMetrics(ts), NewWriteRequest())
586586
if err != nil {
587587
t.Fatal(err)
588588
}
@@ -600,7 +600,7 @@ func TestInsertCompressed(t *testing.T) {
600600
}
601601
}
602602
//ingest after compression
603-
_, err = ingestor.Ingest(ts, NewWriteRequest())
603+
_, err = ingestor.Ingest(copyMetrics(ts), NewWriteRequest())
604604
if err != nil {
605605
t.Fatal(err)
606606
}
@@ -614,3 +614,15 @@ func TestInsertCompressed(t *testing.T) {
614614
}
615615
})
616616
}
617+
618+
// deep copy the metrics since we mutate them, and don't want to invalidate the tests
619+
func copyMetrics(metrics []prompb.TimeSeries) []prompb.TimeSeries {
620+
out := make([]prompb.TimeSeries, len(metrics))
621+
copy(out, metrics)
622+
for i := range out {
623+
samples := make([]prompb.Sample, len(out[i].Samples))
624+
copy(samples, out[i].Samples)
625+
out[i].Samples = samples
626+
}
627+
return out
628+
}

pkg/pgmodel/end_to_end_tests/drop_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestSQLRetentionPeriod(t *testing.T) {
4949
t.Fatal(err)
5050
}
5151
defer ingestor.Close()
52-
_, err = ingestor.Ingest(ts, NewWriteRequest())
52+
_, err = ingestor.Ingest(copyMetrics(ts), NewWriteRequest())
5353
if err != nil {
5454
t.Fatal(err)
5555
}
@@ -143,7 +143,7 @@ func TestSQLDropChunk(t *testing.T) {
143143
t.Fatal(err)
144144
}
145145
defer ingestor.Close()
146-
_, err = ingestor.Ingest(ts, NewWriteRequest())
146+
_, err = ingestor.Ingest(copyMetrics(ts), NewWriteRequest())
147147
if err != nil {
148148
t.Error(err)
149149
}
@@ -233,7 +233,7 @@ func TestSQLDropMetricChunk(t *testing.T) {
233233
}
234234

235235
defer ingestor.Close()
236-
_, err = ingestor.Ingest(ts, NewWriteRequest())
236+
_, err = ingestor.Ingest(copyMetrics(ts), NewWriteRequest())
237237
if err != nil {
238238
t.Error(err)
239239
}

pkg/pgmodel/end_to_end_tests/nan_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestSQLStaleNaN(t *testing.T) {
7373
t.Fatal(err)
7474
}
7575
defer ingestor.Close()
76-
_, err = ingestor.Ingest(metrics, NewWriteRequest())
76+
_, err = ingestor.Ingest(copyMetrics(metrics), NewWriteRequest())
7777

7878
if err != nil {
7979
t.Fatalf("unexpected error while ingesting test dataset: %s", err)

pkg/pgmodel/end_to_end_tests/query_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ func ingestQueryTestDataset(db *pgxpool.Pool, t testing.TB, metrics []prompb.Tim
565565
if err != nil {
566566
t.Fatal(err)
567567
}
568-
cnt, err := ingestor.Ingest(metrics, NewWriteRequest())
568+
cnt, err := ingestor.Ingest(copyMetrics(metrics), NewWriteRequest())
569569

570570
if err != nil {
571571
t.Fatalf("unexpected error while ingesting test dataset: %s", err)

pkg/pgmodel/end_to_end_tests/view_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func TestSQLView(t *testing.T) {
150150
}
151151

152152
defer ingestor.Close()
153-
_, err = ingestor.Ingest(metrics, NewWriteRequest())
153+
_, err = ingestor.Ingest(copyMetrics(metrics), NewWriteRequest())
154154

155155
if err != nil {
156156
t.Fatal(err)
@@ -248,7 +248,7 @@ func TestSQLViewSelectors(t *testing.T) {
248248
t.Fatal(err)
249249
}
250250
defer ingestor.Close()
251-
_, err = ingestor.Ingest(metrics, NewWriteRequest())
251+
_, err = ingestor.Ingest(copyMetrics(metrics), NewWriteRequest())
252252

253253
if err != nil {
254254
t.Fatal(err)

pkg/pgmodel/ingestor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ func (i *DBIngestor) parseData(tts []prompb.TimeSeries, req *prompb.WriteRequest
7676
dataSamples := make(map[string][]samplesInfo)
7777
rows := 0
7878

79-
for _, t := range tts {
79+
for i := range tts {
80+
t := &tts[i]
8081
if len(t.Samples) == 0 {
8182
continue
8283
}

0 commit comments

Comments
 (0)