Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 8ba45d2

Browse files
Cache label IDs
Add inverted cache ( (metric + label pair) -> (id, pos)) to avoid DB calls for fetching label IDs in cases when series ID is not cached. This cache is only used when ingesting data. Benchmarks are showing around 5-10% gains in ingest performance and about 25% less DB calls for fetching label IDs (note that these numbers depend a lot on a shape of the dataset).
1 parent cb9ec6f commit 8ba45d2

16 files changed

+176
-85
lines changed

pkg/pgclient/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,10 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t
137137
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
138138
seriesCache := cache.NewSeriesCache(cfg.CacheConfig, sigClose)
139139
c := ingestor.Cfg{
140-
NumCopiers: numCopiers,
141-
IgnoreCompressedChunks: cfg.IgnoreCompressedChunks,
142-
AsyncAcks: cfg.AsyncAcks,
140+
NumCopiers: numCopiers,
141+
IgnoreCompressedChunks: cfg.IgnoreCompressedChunks,
142+
AsyncAcks: cfg.AsyncAcks,
143+
InvertedLabelsCacheSize: cfg.CacheConfig.InvertedLabelsCacheSize,
143144
}
144145

145146
labelsReader := lreader.NewLabelsReader(dbConn, labelsCache, mt.ReadAuthorizer())

pkg/pgmodel/cache/cache.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
const (
1616
DefaultMetricCacheSize = 10000
17+
DefaultLabelsCacheSize = 100000
1718
)
1819

1920
type LabelsCache interface {

pkg/pgmodel/cache/flags.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,17 @@ type Config struct {
3030
MetricsCacheSize uint64
3131
LabelsCacheSize uint64
3232
ExemplarKeyPosCacheSize uint64
33+
InvertedLabelsCacheSize uint64
3334
}
3435

3536
var DefaultConfig = Config{
3637
SeriesCacheInitialSize: DefaultSeriesCacheSize,
3738
SeriesCacheMemoryMaxBytes: 1000000,
3839

3940
MetricsCacheSize: DefaultMetricCacheSize,
40-
LabelsCacheSize: 1000,
41+
LabelsCacheSize: DefaultLabelsCacheSize,
4142
ExemplarKeyPosCacheSize: DefaultExemplarKeyPosCacheSize,
43+
InvertedLabelsCacheSize: DefaultInvertedLabelsCacheSize,
4244
}
4345

4446
func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
@@ -47,11 +49,12 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
4749

4850
fs.Uint64Var(&cfg.MetricsCacheSize, "metrics.cache.metrics.size", DefaultMetricCacheSize, "Maximum number of metric names to cache.")
4951
fs.Uint64Var(&cfg.SeriesCacheInitialSize, "metrics.cache.series.initial-size", DefaultSeriesCacheSize, "Maximum number of series to cache.")
50-
fs.Uint64Var(&cfg.LabelsCacheSize, "metrics.cache.labels.size", 10000, "Maximum number of labels to cache.")
52+
fs.Uint64Var(&cfg.LabelsCacheSize, "metrics.cache.labels.size", DefaultLabelsCacheSize, "Maximum number of labels to cache.")
5153
fs.Uint64Var(&cfg.ExemplarKeyPosCacheSize, "metrics.cache.exemplar.size", DefaultExemplarKeyPosCacheSize, "Maximum number of exemplar metrics key-position to cache. "+
5254
"It has one-to-one mapping with number of metrics that have exemplar, as key positions are saved per metric basis.")
5355
fs.Var(&cfg.seriesCacheMemoryMaxFlag, "metrics.cache.series.max-bytes", "Initial number of elements in the series cache. "+
5456
"Specified in bytes or as a percentage of the memory-target (e.g. 50%).")
57+
fs.Uint64Var(&cfg.InvertedLabelsCacheSize, "metrics.cache.inverted-labels.size", DefaultInvertedLabelsCacheSize, "Maximum number of label-ids to cache. This helps increase ingest performance.")
5558
return cfg
5659
}
5760

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package cache
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/timescale/promscale/pkg/clockcache"
7+
)
8+
9+
const DefaultInvertedLabelsCacheSize = 500000
10+
11+
type LabelInfo struct {
12+
LabelID int32 // id of label
13+
Pos int32 // position of specific label within a specific metric.
14+
}
15+
16+
type LabelKey struct {
17+
MetricName, Name, Value string
18+
}
19+
20+
func NewLabelKey(metricName, name, value string) LabelKey {
21+
return LabelKey{MetricName: metricName, Name: name, Value: value}
22+
}
23+
24+
func (lk LabelKey) len() int {
25+
return len(lk.MetricName) + len(lk.Name) + len(lk.Value)
26+
}
27+
28+
func NewLabelInfo(lableID, pos int32) LabelInfo {
29+
return LabelInfo{LabelID: lableID, Pos: pos}
30+
}
31+
32+
func (li LabelInfo) len() int {
33+
return 8
34+
}
35+
36+
// (metric, label key-pair) -> (label id,label position) cache
37+
// Used when creating series to avoid DB calls for labels
38+
// Each label position is unique for a specific metric, meaning that
39+
// one label can have different position for different metrics
40+
type InvertedLabelsCache struct {
41+
cache *clockcache.Cache
42+
}
43+
44+
// Cache is thread-safe
45+
func NewInvertedLablesCache(size uint64) (*InvertedLabelsCache, error) {
46+
if size <= 0 {
47+
return nil, fmt.Errorf("labels cache size must be > 0")
48+
}
49+
cache := clockcache.WithMetrics("inverted_labels", "metric", size)
50+
return &InvertedLabelsCache{cache}, nil
51+
}
52+
53+
func (c *InvertedLabelsCache) GetLabelsId(key LabelKey) (LabelInfo, bool) {
54+
id, found := c.cache.Get(key)
55+
if found {
56+
return id.(LabelInfo), found
57+
}
58+
return LabelInfo{}, false
59+
}
60+
61+
func (c *InvertedLabelsCache) Put(key LabelKey, val LabelInfo) bool {
62+
_, added := c.cache.Insert(key, val, uint64(key.len())+uint64(val.len())+17)
63+
return added
64+
}

pkg/pgmodel/ingestor/dispatcher.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type pgxDispatcher struct {
5151

5252
var _ model.Dispatcher = &pgxDispatcher{}
5353

54-
func newPgxDispatcher(conn pgxconn.PgxConn, cache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*pgxDispatcher, error) {
54+
func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*pgxDispatcher, error) {
5555
numCopiers := cfg.NumCopiers
5656
if numCopiers < 1 {
5757
log.Warn("msg", "num copiers less than 1, setting to 1")
@@ -76,7 +76,11 @@ func newPgxDispatcher(conn pgxconn.PgxConn, cache cache.MetricCache, scache cach
7676
}
7777

7878
labelArrayOID := model.GetCustomTypeOID(model.LabelArray)
79-
sw := NewSeriesWriter(conn, labelArrayOID)
79+
labelsCache, err := cache.NewInvertedLablesCache(cfg.InvertedLabelsCacheSize)
80+
if err != nil {
81+
return nil, err
82+
}
83+
sw := NewSeriesWriter(conn, labelArrayOID, labelsCache)
8084
elf := NewExamplarLabelFormatter(conn, eCache)
8185

8286
for i := 0; i < numCopiers; i++ {
@@ -85,7 +89,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, cache cache.MetricCache, scache cach
8589

8690
inserter := &pgxDispatcher{
8791
conn: conn,
88-
metricTableNames: cache,
92+
metricTableNames: mCache,
8993
scache: scache,
9094
exemplarKeyPosCache: eCache,
9195
completeMetricCreation: make(chan struct{}, 1),

pkg/pgmodel/ingestor/handler_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ func getSeries(t *testing.T, scache *cache.SeriesCacheImpl, labels labels.Labels
1919
return series
2020
}
2121

22-
func makeLabelKey(l labels.Label) labelKey {
23-
return labelKey{MetricName: "metric", Name: l.Name, Value: l.Value}
22+
func makeLabelKey(l labels.Label) cache.LabelKey {
23+
return cache.LabelKey{MetricName: "metric", Name: l.Name, Value: l.Value}
2424
}
2525

2626
func TestLabelArrayCreator(t *testing.T) {
@@ -31,9 +31,9 @@ func TestLabelArrayCreator(t *testing.T) {
3131
seriesSet := []*model.Series{
3232
getSeries(t, scache, labels.Labels{metricNameLabel, valOne}),
3333
}
34-
labelMap := map[labelKey]labelInfo{
35-
makeLabelKey(metricNameLabel): {2, 1},
36-
makeLabelKey(valOne): {3, 2},
34+
labelMap := map[cache.LabelKey]cache.LabelInfo{
35+
makeLabelKey(metricNameLabel): cache.NewLabelInfo(2, 1),
36+
makeLabelKey(valOne): cache.NewLabelInfo(3, 2),
3737
}
3838

3939
res, _, err := createLabelArrays(seriesSet, labelMap, 2)
@@ -46,7 +46,7 @@ func TestLabelArrayCreator(t *testing.T) {
4646
expected = [][]int32{{2, 3}}
4747
require.Equal(t, res, expected)
4848

49-
labelMap[makeLabelKey(valOne)] = labelInfo{3, 3}
49+
labelMap[makeLabelKey(valOne)] = cache.NewLabelInfo(3, 3)
5050
res, _, err = createLabelArrays(seriesSet, labelMap, 3)
5151
require.NoError(t, err)
5252
expected = [][]int32{{2, 0, 3}}
@@ -57,10 +57,10 @@ func TestLabelArrayCreator(t *testing.T) {
5757
getSeries(t, scache, labels.Labels{metricNameLabel, valOne}),
5858
getSeries(t, scache, labels.Labels{metricNameLabel, valTwo}),
5959
}
60-
labelMap = map[labelKey]labelInfo{
61-
makeLabelKey(metricNameLabel): {100, 1},
62-
makeLabelKey(valOne): {1, 5},
63-
makeLabelKey(valTwo): {2, 5},
60+
labelMap = map[cache.LabelKey]cache.LabelInfo{
61+
makeLabelKey(metricNameLabel): cache.NewLabelInfo(100, 1),
62+
makeLabelKey(valOne): cache.NewLabelInfo(1, 5),
63+
makeLabelKey(valTwo): cache.NewLabelInfo(2, 5),
6464
}
6565

6666
res, ser, err := createLabelArrays(seriesSet, labelMap, 5)
@@ -79,10 +79,10 @@ func TestLabelArrayCreator(t *testing.T) {
7979
getSeries(t, scache, labels.Labels{metricNameLabel, valOne}),
8080
setSeries,
8181
}
82-
labelMap = map[labelKey]labelInfo{
83-
makeLabelKey(metricNameLabel): {100, 1},
84-
makeLabelKey(valOne): {1, 5},
85-
makeLabelKey(valTwo): {2, 5},
82+
labelMap = map[cache.LabelKey]cache.LabelInfo{
83+
makeLabelKey(metricNameLabel): cache.NewLabelInfo(100, 1),
84+
makeLabelKey(valOne): cache.NewLabelInfo(1, 5),
85+
makeLabelKey(valTwo): cache.NewLabelInfo(2, 5),
8686
}
8787
res, ser, err = createLabelArrays(seriesSet, labelMap, 5)
8888
require.NoError(t, err)

pkg/pgmodel/ingestor/ingestor.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import (
2424
)
2525

2626
type Cfg struct {
27-
AsyncAcks bool
28-
NumCopiers int
29-
DisableEpochSync bool
30-
IgnoreCompressedChunks bool
27+
AsyncAcks bool
28+
NumCopiers int
29+
DisableEpochSync bool
30+
IgnoreCompressedChunks bool
31+
InvertedLabelsCacheSize uint64
3132
}
3233

3334
// DBIngestor ingest the TimeSeries data into Timescale database.
@@ -57,7 +58,7 @@ func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.
5758
// with an empty config, a new default size metrics cache and a non-ha-aware data parser
5859
func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error) {
5960
if cfg == nil {
60-
cfg = &Cfg{}
61+
cfg = &Cfg{InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize}
6162
}
6263
cacheConfig := cache.DefaultConfig
6364
c := cache.NewMetricCache(cacheConfig)

pkg/pgmodel/ingestor/ingestor_sql_test.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,8 @@ func TestPGXInserterInsertSeries(t *testing.T) {
274274
mock := model.NewSqlRecorder(c.sqlQueries, t)
275275
scache := cache.NewSeriesCache(cache.DefaultConfig, nil)
276276
scache.Reset()
277-
278-
sw := NewSeriesWriter(mock, 0)
277+
lCache, _ := cache.NewInvertedLablesCache(10)
278+
sw := NewSeriesWriter(mock, 0, lCache)
279279

280280
lsi := make([]model.Insertable, 0)
281281
for _, ser := range c.series {
@@ -391,21 +391,6 @@ func TestPGXInserterCacheReset(t *testing.T) {
391391
},
392392
{Sql: "COMMIT;"},
393393
{Sql: "BEGIN;"},
394-
{
395-
Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)",
396-
Args: []interface{}{
397-
"metric_1",
398-
tableName,
399-
[]string{"__name__", "name_1", "name_1"},
400-
[]string{"metric_1", "value_1", "value_2"},
401-
},
402-
Results: model.RowResults{
403-
{[]int32{1, 2, 2}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_1"}, []string{"metric_1", "value_1", "value_2"}},
404-
},
405-
Err: error(nil),
406-
},
407-
{Sql: "COMMIT;"},
408-
{Sql: "BEGIN;"},
409394
{
410395
Sql: seriesInsertSQL,
411396
Args: []interface{}{
@@ -432,8 +417,8 @@ func TestPGXInserterCacheReset(t *testing.T) {
432417

433418
mock := model.NewSqlRecorder(sqlQueries, t)
434419
scache := cache.NewSeriesCache(cache.DefaultConfig, nil)
435-
436-
sw := NewSeriesWriter(mock, 0)
420+
lcache, _ := cache.NewInvertedLablesCache(10)
421+
sw := NewSeriesWriter(mock, 0, lcache)
437422
inserter := pgxDispatcher{
438423
conn: mock,
439424
scache: scache,
@@ -848,7 +833,7 @@ func TestPGXInserterInsertData(t *testing.T) {
848833
if err != nil {
849834
t.Fatalf("error setting up mock cache: %s", err.Error())
850835
}
851-
inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true})
836+
inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true, InvertedLabelsCacheSize: 10})
852837
if err != nil {
853838
t.Fatal(err)
854839
}

0 commit comments

Comments
 (0)