Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 016a0a4

Browse files
committed
Add label cache, and only fetch missing labels at query time
In simple queries, approximately 1/3rd our queries runtime is spent in key_value_array. To reduce the time spent, this commit adds a cache of ids-to-labels in the connector, so we can only fetch those labels we have never seen before.
1 parent 6afb228 commit 016a0a4

File tree

11 files changed

+327
-312
lines changed

11 files changed

+327
-312
lines changed

pkg/clockcache/cache.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ type element struct {
3636
}
3737

3838
func WithMax(max uint64) *Cache {
39-
if max < 1 {
40-
panic("must have max greater than 0")
41-
}
4239
return &Cache{
4340
elements: make(map[interface{}]*element, max),
4441
storage: make([]element, 0, max),

pkg/pgclient/client.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"context"
55
"flag"
66
"fmt"
7-
"github.com/prometheus/client_golang/prometheus"
87
"runtime"
98

9+
"github.com/prometheus/client_golang/prometheus"
10+
1011
"github.com/allegro/bigcache"
1112
"github.com/jackc/pgx/v4/pgxpool"
1213

@@ -29,6 +30,7 @@ type Config struct {
2930
dbConnectRetries int
3031
AsyncAcks bool
3132
ReportInterval int
33+
LabelsCacheSize uint64
3234
}
3335

3436
// ParseFlags parses the configuration flags specific to PostgreSQL and TimescaleDB
@@ -42,6 +44,7 @@ func ParseFlags(cfg *Config) *Config {
4244
flag.IntVar(&cfg.dbConnectRetries, "db-connect-retries", 0, "How many times to retry connecting to the database")
4345
flag.BoolVar(&cfg.AsyncAcks, "async-acks", false, "Ack before data is written to DB")
4446
flag.IntVar(&cfg.ReportInterval, "tput-report", 0, "interval in seconds at which throughput should be reported")
47+
flag.Uint64Var(&cfg.LabelsCacheSize, "labels-cache-size", 10000, "maximum number of labels to cache")
4548
return cfg
4649
}
4750

@@ -84,7 +87,7 @@ func NewClient(cfg *Config, readHist prometheus.ObserverVec) (*Client, error) {
8487
log.Error("err starting ingestor", err)
8588
return nil, err
8689
}
87-
reader := pgmodel.NewPgxReaderWithMetricCache(connectionPool, cache)
90+
reader := pgmodel.NewPgxReaderWithMetricCache(connectionPool, cache, cfg.LabelsCacheSize)
8891

8992
queryable := query.NewQueryable(reader.GetQuerier())
9093

pkg/pgmodel/end_to_end_tests/nan_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func TestSQLStaleNaN(t *testing.T) {
130130
}
131131

132132
for _, c := range query {
133-
r := NewPgxReader(db, nil)
133+
r := NewPgxReader(db, nil, 100)
134134
resp, err := r.Read(&c.rrq)
135135
startMs := c.rrq.Queries[0].StartTimestampMs
136136
endMs := c.rrq.Queries[0].EndTimestampMs

pkg/pgmodel/end_to_end_tests/promql_endpoint_integration_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package end_to_end_tests
33
import (
44
"encoding/json"
55
"fmt"
6-
"github.com/prometheus/common/route"
76
"io/ioutil"
87
"net/http"
98
"net/http/httptest"
@@ -13,6 +12,8 @@ import (
1312
"testing"
1413
"time"
1514

15+
"github.com/prometheus/common/route"
16+
1617
"github.com/jackc/pgx/v4/pgxpool"
1718
"github.com/prometheus/common/model"
1819
"github.com/timescale/timescale-prometheus/pkg/api"
@@ -347,7 +348,7 @@ func TestPromQLQueryEndpoint(t *testing.T) {
347348
return
348349
}
349350

350-
r := pgmodel.NewPgxReader(readOnly, nil)
351+
r := pgmodel.NewPgxReader(readOnly, nil, 100)
351352
queryable := query.NewQueryable(r.GetQuerier())
352353
queryEngine := query.NewEngine(log.GetLogger(), time.Minute)
353354

@@ -401,7 +402,7 @@ func TestPromQLLabelEndpoints(t *testing.T) {
401402
return
402403
}
403404

404-
r := pgmodel.NewPgxReader(readOnly, nil)
405+
r := pgmodel.NewPgxReader(readOnly, nil, 100)
405406
queryable := query.NewQueryable(r.GetQuerier())
406407

407408
labelNamesHandler := api.Labels(queryable)

pkg/pgmodel/end_to_end_tests/query_integration_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ func TestSQLQuery(t *testing.T) {
534534
t.Fatalf("Cannot run test, not an instance of testing.T")
535535
}
536536

537-
r := NewPgxReader(readOnly, nil)
537+
r := NewPgxReader(readOnly, nil, 100)
538538
for _, c := range testCases {
539539
tester.Run(c.name, func(t *testing.T) {
540540
resp, err := r.Read(&c.readRequest)
@@ -915,7 +915,7 @@ func TestPromQL(t *testing.T) {
915915
return
916916
}
917917

918-
r := NewPgxReader(readOnly, nil)
918+
r := NewPgxReader(readOnly, nil, 100)
919919
for _, c := range testCases {
920920
tester.Run(c.name, func(t *testing.T) {
921921
connResp, connErr := r.Read(c.readRequest)
@@ -1184,7 +1184,7 @@ func TestPushdown(t *testing.T) {
11841184
return
11851185
}
11861186

1187-
r := NewPgxReader(readOnly, nil)
1187+
r := NewPgxReader(readOnly, nil, 100)
11881188
queryable := query.NewQueryable(r.GetQuerier())
11891189
queryEngine := query.NewEngine(log.GetLogger(), time.Minute)
11901190

pkg/pgmodel/labels.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,8 @@ func (l *Labels) Less(i, j int) bool {
161161
}
162162

163163
func (l *Labels) Swap(i, j int) {
164-
tmp := l.names[j]
165-
l.names[j] = l.names[i]
166-
l.names[i] = tmp
167-
168-
tmp = l.values[j]
169-
l.values[j] = l.values[i]
170-
l.values[i] = tmp
164+
l.names[j], l.names[i] = l.names[i], l.names[j]
165+
l.values[j], l.values[i] = l.values[i], l.values[j]
171166
}
172167

173168
// FromLabelMatchers parses protobuf label matchers to Prometheus label matchers.

pkg/pgmodel/pgx.go

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/prometheus/prometheus/pkg/labels"
2626
"github.com/prometheus/prometheus/promql/parser"
2727
"github.com/prometheus/prometheus/storage"
28+
"github.com/timescale/timescale-prometheus/pkg/clockcache"
2829
"github.com/timescale/timescale-prometheus/pkg/log"
2930
"github.com/timescale/timescale-prometheus/pkg/prompb"
3031
)
@@ -883,12 +884,13 @@ func (p *pendingBuffer) addReq(req insertDataRequest) bool {
883884

884885
// NewPgxReaderWithMetricCache returns a new DBReader that reads from PostgreSQL using PGX
885886
// and caches metric table names using the supplied cacher.
886-
func NewPgxReaderWithMetricCache(c *pgxpool.Pool, cache MetricCache) *DBReader {
887+
func NewPgxReaderWithMetricCache(c *pgxpool.Pool, cache MetricCache, labelsCacheSize uint64) *DBReader {
887888
pi := &pgxQuerier{
888889
conn: &pgxConnImpl{
889890
conn: c,
890891
},
891892
metricTableNames: cache,
893+
labels: clockcache.WithMax(labelsCacheSize),
892894
}
893895

894896
return &DBReader{
@@ -897,10 +899,10 @@ func NewPgxReaderWithMetricCache(c *pgxpool.Pool, cache MetricCache) *DBReader {
897899
}
898900

899901
// NewPgxReader returns a new DBReader that reads that from PostgreSQL using PGX.
900-
func NewPgxReader(c *pgxpool.Pool, readHist prometheus.ObserverVec) *DBReader {
902+
func NewPgxReader(c *pgxpool.Pool, readHist prometheus.ObserverVec, labelsCacheSize uint64) *DBReader {
901903
metrics, _ := bigcache.NewBigCache(DefaultCacheConfig())
902904
cache := &MetricNameCache{metrics}
903-
return NewPgxReaderWithMetricCache(c, cache)
905+
return NewPgxReaderWithMetricCache(c, cache, labelsCacheSize)
904906
}
905907

906908
type metricTimeRangeFilter struct {
@@ -912,6 +914,8 @@ type metricTimeRangeFilter struct {
912914
type pgxQuerier struct {
913915
conn pgxConn
914916
metricTableNames MetricCache
917+
// contains [int64]labels.Label
918+
labels *clockcache.Cache
915919
}
916920

917921
// HealthCheck implements the healtchecker interface
@@ -933,7 +937,7 @@ func (q *pgxQuerier) Select(mint int64, maxt int64, sortSeries bool, hints *stor
933937
return nil, nil, nil, err
934938
}
935939

936-
ss, warn, err := buildSeriesSet(rows, sortSeries)
940+
ss, warn, err := buildSeriesSet(rows, sortSeries, q)
937941
return ss, topNode, warn, err
938942
}
939943

@@ -963,7 +967,7 @@ func (q *pgxQuerier) Query(query *prompb.Query) ([]*prompb.TimeSeries, error) {
963967
results := make([]*prompb.TimeSeries, 0, len(rows))
964968

965969
for _, r := range rows {
966-
ts, err := buildTimeSeries(r)
970+
ts, err := buildTimeSeries(r, q)
967971

968972
if err != nil {
969973
return nil, err
@@ -1021,6 +1025,82 @@ func (q *pgxQuerier) LabelValues(labelName string) ([]string, error) {
10211025
return labelValues, nil
10221026
}
10231027

1028+
const GetLabelsSQL = "SELECT (key_value_array($1::int[])).*"
1029+
1030+
type labelQuerier interface {
1031+
getLabelsForIds(ids []int64) (lls labels.Labels, err error)
1032+
}
1033+
1034+
func (q *pgxQuerier) getPrompbLabelsForIds(ids []int64) (lls []prompb.Label, err error) {
1035+
ll, err := q.getLabelsForIds(ids)
1036+
if err != nil {
1037+
return
1038+
}
1039+
lls = make([]prompb.Label, len(ll))
1040+
for i := range ll {
1041+
lls[i] = prompb.Label{Name: ll[i].Name, Value: ll[i].Value}
1042+
}
1043+
return
1044+
}
1045+
1046+
func (q *pgxQuerier) getLabelsForIds(ids []int64) (lls labels.Labels, err error) {
1047+
keys := make([]interface{}, len(ids))
1048+
values := make([]interface{}, len(ids))
1049+
for i := range ids {
1050+
keys[i] = ids[i]
1051+
}
1052+
numHits := q.labels.GetValues(keys, values)
1053+
1054+
if numHits < len(ids) {
1055+
err = q.fetchMissingLabels(keys[numHits:], ids[numHits:], values[numHits:])
1056+
if err != nil {
1057+
return
1058+
}
1059+
}
1060+
1061+
lls = make([]labels.Label, 0, len(values))
1062+
for i := range values {
1063+
lls = append(lls, values[i].(labels.Label))
1064+
}
1065+
1066+
return
1067+
}
1068+
1069+
func (q *pgxQuerier) fetchMissingLabels(misses []interface{}, missedIds []int64, newLabels []interface{}) error {
1070+
for i := range misses {
1071+
missedIds[i] = misses[i].(int64)
1072+
}
1073+
rows, err := q.conn.Query(context.Background(), GetLabelsSQL, missedIds)
1074+
if err != nil {
1075+
return err
1076+
}
1077+
defer rows.Close()
1078+
1079+
for rows.Next() {
1080+
var keys []string
1081+
var vals []string
1082+
err = rows.Scan(&keys, &vals)
1083+
if err != nil {
1084+
return err
1085+
}
1086+
if len(keys) != len(vals) {
1087+
return fmt.Errorf("query returned a mismatch in timestamps and values: %d, %d", len(keys), len(vals))
1088+
}
1089+
if len(keys) != len(misses) {
1090+
return fmt.Errorf("query returned wrong number of labels: %d, %d", len(misses), len(keys))
1091+
}
1092+
1093+
for i := range misses {
1094+
newLabels[i] = labels.Label{Name: keys[i], Value: vals[i]}
1095+
}
1096+
numInserted := q.labels.InsertBatch(misses, newLabels)
1097+
if numInserted < len(misses) {
1098+
log.Warn("msg", "labels cache starving, may need to increase size")
1099+
}
1100+
}
1101+
return nil
1102+
}
1103+
10241104
func (q *pgxQuerier) getResultRows(startTimestamp int64, endTimestamp int64, hints *storage.SelectHints, path []parser.Node, matchers []*labels.Matcher) ([]pgx.Rows, parser.Node, error) {
10251105

10261106
metric, cases, values, err := buildSubQueries(matchers)

0 commit comments

Comments
 (0)