Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit c8777f8

Browse files
committed
Add concurrent write test
This adds an end-to-end concurrent write test. The data generation here is modeled after how prometheus would send data. Namely requests come from queues that have non-overlapping series but do have overlapping metrics.
1 parent 26a0c52 commit c8777f8

File tree

1 file changed

+241
-0
lines changed

1 file changed

+241
-0
lines changed
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
package end_to_end_tests
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io/ioutil"
7+
"math/rand"
8+
"net/http"
9+
"net/http/httptest"
10+
"net/url"
11+
"strings"
12+
"sync"
13+
"testing"
14+
"time"
15+
16+
"github.com/gogo/protobuf/proto"
17+
"github.com/golang/snappy"
18+
"github.com/jackc/pgx/v4/pgxpool"
19+
"github.com/prometheus/client_golang/prometheus"
20+
"github.com/prometheus/common/model"
21+
"github.com/prometheus/common/route"
22+
"github.com/timescale/timescale-prometheus/pkg/api"
23+
"github.com/timescale/timescale-prometheus/pkg/internal/testhelpers"
24+
"github.com/timescale/timescale-prometheus/pkg/pgmodel"
25+
"github.com/timescale/timescale-prometheus/pkg/prompb"
26+
"github.com/timescale/timescale-prometheus/pkg/util"
27+
)
28+
29+
type dataGenerator struct {
30+
metricGroup int //nonoverlapping metrics
31+
queueID int
32+
33+
metricsPerGroup int
34+
labelSetsPerMetric int
35+
maxSamples int64
36+
deltaTime int64
37+
deltaValue float64
38+
currentTime int64
39+
}
40+
41+
func (t *dataGenerator) generateTimeseries() []prompb.TimeSeries {
42+
metrics := []prompb.TimeSeries{}
43+
for metric := 0; metric < t.metricsPerGroup; metric++ {
44+
for instance := 0; instance < t.labelSetsPerMetric; instance++ {
45+
labelSet := []prompb.Label{
46+
{Name: pgmodel.MetricNameLabelName, Value: fmt.Sprintf("metric_%d_%d", t.metricGroup, metric)},
47+
{Name: "foo", Value: fmt.Sprintf("bar_%d", t.queueID)}, //queues have non-overlapping label sets
48+
{Name: "instance", Value: fmt.Sprintf("%d", instance)},
49+
}
50+
51+
if instance%2 == 0 {
52+
labelSet = append(labelSet,
53+
prompb.Label{Name: "sometimes_label", Value: "constant"})
54+
}
55+
56+
metrics = append(metrics, prompb.TimeSeries{
57+
Labels: labelSet,
58+
})
59+
}
60+
61+
}
62+
63+
numSamples := rand.Int63n(t.maxSamples)
64+
for i := range metrics {
65+
metrics[i].Samples = t.generateSamples(numSamples)
66+
}
67+
t.currentTime += t.maxSamples*t.deltaTime + int64(1)
68+
return metrics
69+
}
70+
71+
func (t *dataGenerator) generateSamples(count int64) []prompb.Sample {
72+
samples := make([]prompb.Sample, 0, 3)
73+
i := int64(0)
74+
for i < count {
75+
samples = append(samples, prompb.Sample{
76+
Timestamp: t.currentTime + (t.deltaTime * int64(i)),
77+
Value: float64(t.metricGroup*10000) + (t.deltaValue * float64(i)),
78+
})
79+
i++
80+
}
81+
82+
return samples
83+
}
84+
85+
func getHTTPWriteRequest(protoRequest *prompb.WriteRequest) (*http.Request, error) {
86+
data, err := proto.Marshal(protoRequest)
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
body := string(snappy.Encode(nil, data))
92+
u, err := url.Parse(fmt.Sprintf("http://%s:%d/write", testhelpers.PromHost, testhelpers.PromPort.Int()))
93+
94+
if err != nil {
95+
return nil, err
96+
}
97+
return http.NewRequest(
98+
"POST",
99+
u.String(),
100+
strings.NewReader(body),
101+
)
102+
}
103+
104+
func sendWriteRequest(t testing.TB, router *route.Router, ts []prompb.TimeSeries) {
105+
req, err := getHTTPWriteRequest(&prompb.WriteRequest{Timeseries: ts})
106+
if err != nil {
107+
t.Fatalf("unable to create PromQL label names request: %v", err)
108+
}
109+
110+
rec := httptest.NewRecorder()
111+
router.ServeHTTP(rec, req)
112+
113+
tsResp := rec.Result()
114+
if rec.Code != 200 {
115+
t.Fatal(rec.Code)
116+
}
117+
118+
_, err = ioutil.ReadAll(tsResp.Body)
119+
if err != nil {
120+
t.Fatalf("unexpected error returned when reading connector response body:\n%s\n", err.Error())
121+
}
122+
defer tsResp.Body.Close()
123+
}
124+
125+
func verifyTimeseries(t testing.TB, db *pgxpool.Pool, tsSlice []prompb.TimeSeries) {
126+
for tsIdx := range tsSlice {
127+
ts := tsSlice[tsIdx]
128+
name := ""
129+
names := []string{}
130+
values := []string{}
131+
for labelIdx := range ts.Labels {
132+
label := ts.Labels[labelIdx]
133+
if label.Name == pgmodel.MetricNameLabelName {
134+
name = label.Value
135+
136+
}
137+
names = append(names, label.Name)
138+
values = append(values, label.Value)
139+
}
140+
if name == "" {
141+
t.Fatal("No ts series metric name found")
142+
}
143+
for sampleIdx := range ts.Samples {
144+
sample := ts.Samples[sampleIdx]
145+
rows, err := db.Query(context.Background(), fmt.Sprintf("SELECT value FROM prom_data.%s WHERE time = $1 and series_id = (SELECT series_id FROM _prom_catalog.get_or_create_series_id_for_kv_array($2, $3, $4))",
146+
name), model.Time(sample.Timestamp).Time(), name, names, values)
147+
if err != nil {
148+
t.Fatal(err)
149+
}
150+
defer rows.Close()
151+
count := 0
152+
for rows.Next() {
153+
var val *float64
154+
err := rows.Scan(&val)
155+
if err != nil {
156+
t.Fatal(err)
157+
}
158+
if val == nil {
159+
t.Fatal("NULL value")
160+
}
161+
if *val != sample.Value {
162+
t.Errorf("Unexpected value: got %v, unexpected %v", *val, sample.Value)
163+
}
164+
count++
165+
}
166+
if count != 1 {
167+
t.Errorf("Unexpected count: %d", count)
168+
}
169+
}
170+
}
171+
}
172+
173+
func getWriteRouter(t testing.TB, db *pgxpool.Pool) *route.Router {
174+
r, err := pgmodel.NewPgxIngestor(db)
175+
if err != nil {
176+
t.Fatal(err)
177+
}
178+
179+
writeHandler := api.Write(r, nil, &api.Metrics{
180+
LeaderGauge: prometheus.NewGauge(prometheus.GaugeOpts{}),
181+
ReceivedSamples: prometheus.NewCounter(prometheus.CounterOpts{}),
182+
FailedSamples: prometheus.NewCounter(prometheus.CounterOpts{}),
183+
SentSamples: prometheus.NewCounter(prometheus.CounterOpts{}),
184+
SentBatchDuration: prometheus.NewHistogram(prometheus.HistogramOpts{}),
185+
WriteThroughput: util.NewThroughputCalc(time.Second),
186+
})
187+
188+
router := route.New()
189+
router.Post("/write", writeHandler.ServeHTTP)
190+
return router
191+
}
192+
193+
func sendConcurrentWrites(t testing.TB, db *pgxpool.Pool, queues int, metricGroups int, totalRequests int, duplicates bool) {
194+
router := getWriteRouter(t, db)
195+
196+
wg := sync.WaitGroup{}
197+
for i := 0; i < queues; i++ {
198+
for m := 0; m < metricGroups; m++ {
199+
queueIdx := i
200+
midx := m
201+
wg.Add(1)
202+
go func() {
203+
defer wg.Done()
204+
dg := dataGenerator{
205+
metricGroup: midx,
206+
queueID: queueIdx,
207+
208+
metricsPerGroup: 2,
209+
labelSetsPerMetric: 2,
210+
maxSamples: 5,
211+
deltaTime: 2000,
212+
deltaValue: 3,
213+
currentTime: startTime,
214+
}
215+
tss := [][]prompb.TimeSeries{}
216+
for requestNo := 0; requestNo < totalRequests; requestNo++ {
217+
ts := dg.generateTimeseries()
218+
sendWriteRequest(t, router, ts)
219+
if duplicates {
220+
sendWriteRequest(t, router, ts)
221+
}
222+
tss = append(tss, ts)
223+
}
224+
for i := range tss {
225+
verifyTimeseries(t, db, tss[i])
226+
}
227+
}()
228+
}
229+
}
230+
wg.Wait()
231+
}
232+
233+
func TestWrite(t *testing.T) {
234+
if testing.Short() {
235+
t.Skip("skipping integration test")
236+
}
237+
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
238+
sendConcurrentWrites(t, db, 2, 2, 5, false)
239+
sendConcurrentWrites(t, db, 2, 2, 3, true)
240+
})
241+
}

0 commit comments

Comments
 (0)