Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 19dc42d

Browse files
Grow inverted labels cache if needed
1 parent 92ad453 commit 19dc42d

File tree

11 files changed

+139
-125
lines changed

11 files changed

+139
-125
lines changed

pkg/pgclient/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
202202
metricsCache := cache.NewMetricCache(cfg.CacheConfig)
203203
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
204204
seriesCache := cache.NewSeriesCache(cfg.CacheConfig, sigClose)
205+
invertedLabelsCache := cache.NewInvertedLabelsCache(cfg.CacheConfig, sigClose)
206+
205207
c := ingestor.Cfg{
206208
NumCopiers: numCopiers,
207209
IgnoreCompressedChunks: cfg.IgnoreCompressedChunks,
@@ -230,7 +232,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
230232
if !readOnly {
231233
var err error
232234
writerConn = pgxconn.NewPgxConn(writerPool)
233-
dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, &c)
235+
dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, invertedLabelsCache, &c)
234236
if err != nil {
235237
log.Error("msg", "err starting the ingestor", "err", err)
236238
return nil, err

pkg/pgmodel/cache/cache.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,23 @@ package cache
66

77
import (
88
"fmt"
9+
"time"
910

1011
"github.com/timescale/promscale/pkg/clockcache"
12+
"github.com/timescale/promscale/pkg/log"
1113
"github.com/timescale/promscale/pkg/pgmodel/common/errors"
1214
"github.com/timescale/promscale/pkg/pgmodel/model"
1315
)
1416

1517
const (
1618
DefaultMetricCacheSize = 10000
1719
DefaultLabelsCacheSize = 100000
20+
growCheckDuration = time.Second * 5 // check whether to grow the series cache this often
21+
growFactor = float64(2.0) // multiply cache size by this factor when growing the cache
1822
)
1923

24+
var evictionMaxAge = time.Minute * 5 // grow cache if we are evicting elements younger than `now - evictionMaxAge`
25+
2026
type LabelsCache interface {
2127
// GetValues tries to get a batch of keys and store the corresponding values is valuesOut
2228
// returns the number of keys that were actually found.
@@ -116,3 +122,64 @@ func (m *MetricNameCache) Evictions() uint64 {
116122
func NewLabelsCache(config Config) LabelsCache {
117123
return clockcache.WithMetrics("label", "metric", config.LabelsCacheSize)
118124
}
125+
126+
type ResizableCache struct {
127+
*clockcache.Cache
128+
maxSizeBytes uint64
129+
}
130+
131+
func NewResizableCache(cache *clockcache.Cache, maxBytes uint64, sigClose <-chan struct{}) *ResizableCache {
132+
rc := &ResizableCache{cache, maxBytes}
133+
if sigClose != nil {
134+
go rc.runSizeCheck(sigClose)
135+
}
136+
return rc
137+
}
138+
139+
func (rc *ResizableCache) runSizeCheck(sigClose <-chan struct{}) {
140+
ticker := time.NewTicker(growCheckDuration)
141+
for {
142+
select {
143+
case <-ticker.C:
144+
if rc.shouldGrow() {
145+
rc.grow()
146+
}
147+
case <-sigClose:
148+
return
149+
}
150+
}
151+
}
152+
153+
// shouldGrow allows cache growth if we are evicting elements that were recently used or inserted
154+
// evictionMaxAge defines the interval
155+
func (rc *ResizableCache) shouldGrow() bool {
156+
return rc.MaxEvictionTs()+int32(evictionMaxAge.Seconds()) > int32(time.Now().Unix())
157+
}
158+
159+
func (rc *ResizableCache) grow() {
160+
sizeBytes := rc.SizeBytes()
161+
oldSize := rc.Cap()
162+
if float64(sizeBytes)*1.2 >= float64(rc.maxSizeBytes) {
163+
log.Warn("msg", "Cache is too small and cannot be grown",
164+
"current_size_bytes", float64(sizeBytes), "max_size_bytes", float64(rc.maxSizeBytes),
165+
"current_size_elements", oldSize, "check_interval", growCheckDuration,
166+
"eviction_max_age", evictionMaxAge)
167+
return
168+
}
169+
170+
multiplier := growFactor
171+
if float64(sizeBytes)*multiplier >= float64(rc.maxSizeBytes) {
172+
multiplier = float64(rc.maxSizeBytes) / float64(sizeBytes)
173+
}
174+
if multiplier < 1.0 {
175+
return
176+
}
177+
178+
newNumElements := int(float64(oldSize) * multiplier)
179+
log.Info("msg", "Growing the series cache",
180+
"new_size_elements", newNumElements, "current_size_elements", oldSize,
181+
"new_size_bytes", float64(sizeBytes)*multiplier, "max_size_bytes", float64(rc.maxSizeBytes),
182+
"multiplier", multiplier,
183+
"eviction_max_age", evictionMaxAge)
184+
rc.ExpandTo(newNumElements)
185+
}

pkg/pgmodel/cache/flags.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,41 +20,52 @@ var (
2020
Name: "series_cache_max_bytes",
2121
Help: "The target for the maximum amount of memory the series_cache can use in bytes.",
2222
})
23+
InvertedLabelsCacheMaxBytesMetric = prometheus.NewGauge(
24+
prometheus.GaugeOpts{
25+
Namespace: util.PromNamespace,
26+
Name: "inverted_labels_cache_max_bytes",
27+
Help: "The target for the maximum amount of memory the inverted labels cache can use in bytes.",
28+
})
2329
)
2430

2531
type Config struct {
2632
SeriesCacheInitialSize uint64
2733
seriesCacheMemoryMaxFlag limits.PercentageAbsoluteBytesFlag
2834
SeriesCacheMemoryMaxBytes uint64
2935

30-
MetricsCacheSize uint64
31-
LabelsCacheSize uint64
32-
ExemplarKeyPosCacheSize uint64
33-
InvertedLabelsCacheSize uint64
36+
MetricsCacheSize uint64
37+
LabelsCacheSize uint64
38+
ExemplarKeyPosCacheSize uint64
39+
InvertedLabelsCacheSize uint64
40+
InvertedLabelsCacheMaxBytesFlag limits.PercentageAbsoluteBytesFlag
41+
InvertedLabelsCacheMaxBytes uint64
3442
}
3543

3644
var DefaultConfig = Config{
3745
SeriesCacheInitialSize: DefaultSeriesCacheSize,
3846
SeriesCacheMemoryMaxBytes: 1000000,
3947

40-
MetricsCacheSize: DefaultMetricCacheSize,
41-
LabelsCacheSize: DefaultLabelsCacheSize,
42-
ExemplarKeyPosCacheSize: DefaultExemplarKeyPosCacheSize,
43-
InvertedLabelsCacheSize: DefaultInvertedLabelsCacheSize,
48+
MetricsCacheSize: DefaultMetricCacheSize,
49+
LabelsCacheSize: DefaultLabelsCacheSize,
50+
ExemplarKeyPosCacheSize: DefaultExemplarKeyPosCacheSize,
51+
InvertedLabelsCacheSize: DefaultInvertedLabelsCacheSize,
52+
InvertedLabelsCacheMaxBytes: 1000000,
4453
}
4554

4655
func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
4756
/* set defaults */
4857
cfg.seriesCacheMemoryMaxFlag.SetPercent(50)
58+
cfg.InvertedLabelsCacheMaxBytesFlag.SetPercent(10)
4959

5060
fs.Uint64Var(&cfg.MetricsCacheSize, "metrics.cache.metrics.size", DefaultMetricCacheSize, "Maximum number of metric names to cache.")
5161
fs.Uint64Var(&cfg.SeriesCacheInitialSize, "metrics.cache.series.initial-size", DefaultSeriesCacheSize, "Maximum number of series to cache.")
5262
fs.Uint64Var(&cfg.LabelsCacheSize, "metrics.cache.labels.size", DefaultLabelsCacheSize, "Maximum number of labels to cache.")
5363
fs.Uint64Var(&cfg.ExemplarKeyPosCacheSize, "metrics.cache.exemplar.size", DefaultExemplarKeyPosCacheSize, "Maximum number of exemplar metrics key-position to cache. "+
5464
"It has one-to-one mapping with number of metrics that have exemplar, as key positions are saved per metric basis.")
5565
fs.Var(&cfg.seriesCacheMemoryMaxFlag, "metrics.cache.series.max-bytes", "Initial number of elements in the series cache. "+
56-
"Specified in bytes or as a percentage of the memory-target (e.g. 50%).")
66+
"Specified in bytes or as a percentage of the cache.memory-target (e.g. 50%).")
5767
fs.Uint64Var(&cfg.InvertedLabelsCacheSize, "metrics.cache.inverted-labels.size", DefaultInvertedLabelsCacheSize, "Maximum number of label-ids to cache. This helps increase ingest performance.")
68+
fs.Var(&cfg.InvertedLabelsCacheMaxBytesFlag, "metrics.cache.inverted-labels.max-bytes", "Initial size of elements in the invreted labels cache. Specified in bytes or as a percentage of the cache.memory-target (e.g. 50%).")
5869
return cfg
5970
}
6071

@@ -66,14 +77,32 @@ func Validate(cfg *Config, lcfg limits.Config) error {
6677
case limits.Absolute:
6778
cfg.SeriesCacheMemoryMaxBytes = value
6879
default:
69-
return fmt.Errorf("series-cache-max-bytes flag has unknown kind")
80+
return fmt.Errorf("metrics.cache.series.max-bytes flag has unknown kind")
81+
}
82+
if cfg.SeriesCacheMemoryMaxBytes > lcfg.TargetMemoryBytes {
83+
return fmt.Errorf("The metrics.cache.series.max-bytes must be smaller than the cache.memory-target")
7084
}
7185
SeriesCacheMaxBytesMetric.Set(float64(cfg.SeriesCacheMemoryMaxBytes))
7286

73-
if cfg.SeriesCacheMemoryMaxBytes > lcfg.TargetMemoryBytes {
74-
return fmt.Errorf("The series-cache-max-bytes must be smaller than the memory-target")
87+
kind, value = cfg.InvertedLabelsCacheMaxBytesFlag.Get()
88+
switch kind {
89+
case limits.Percentage:
90+
cfg.InvertedLabelsCacheMaxBytes = uint64(float64(lcfg.TargetMemoryBytes) * (float64(value) / 100.0))
91+
case limits.Absolute:
92+
cfg.InvertedLabelsCacheMaxBytes = value
93+
default:
94+
return fmt.Errorf("metrics.cache.inverted-labels.max-bytes flag has unknown kind")
95+
}
96+
97+
if cfg.InvertedLabelsCacheMaxBytes > lcfg.TargetMemoryBytes {
98+
return fmt.Errorf("The metrics.cache.inverted-labels.max-bytes must be smaller than the cache.memory-target")
99+
}
100+
101+
if cfg.SeriesCacheMemoryMaxBytes+cfg.InvertedLabelsCacheMaxBytes > lcfg.TargetMemoryBytes {
102+
return fmt.Errorf("Sum of metrics.cache.series.max-bytes and metrics.cache.inverted-labels.max-bytes must be smaller than the cache.memory-target")
75103
}
76104

105+
InvertedLabelsCacheMaxBytesMetric.Set(float64(cfg.InvertedLabelsCacheMaxBytes))
77106
return nil
78107
}
79108

pkg/pgmodel/cache/inverted_labels_cache.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package cache
22

33
import (
4-
"fmt"
5-
64
"github.com/timescale/promscale/pkg/clockcache"
75
)
86

@@ -38,31 +36,24 @@ func (li LabelInfo) len() int {
3836
// Each label position is unique for a specific metric, meaning that
3937
// one label can have different position for different metrics
4038
type InvertedLabelsCache struct {
41-
cache *clockcache.Cache
39+
*ResizableCache
4240
}
4341

4442
// Cache is thread-safe
45-
func NewInvertedLabelsCache(size uint64) (*InvertedLabelsCache, error) {
46-
if size <= 0 {
47-
return nil, fmt.Errorf("labels cache size must be > 0")
48-
}
49-
cache := clockcache.WithMetrics("inverted_labels", "metric", size)
50-
return &InvertedLabelsCache{cache}, nil
43+
func NewInvertedLabelsCache(config Config, sigClose chan struct{}) *InvertedLabelsCache {
44+
cache := clockcache.WithMetrics("inverted_labels", "metric", config.InvertedLabelsCacheSize)
45+
return &InvertedLabelsCache{NewResizableCache(cache, config.InvertedLabelsCacheMaxBytes, sigClose)}
5146
}
5247

5348
func (c *InvertedLabelsCache) GetLabelsId(key LabelKey) (LabelInfo, bool) {
54-
id, found := c.cache.Get(key)
49+
id, found := c.Get(key)
5550
if found {
5651
return id.(LabelInfo), found
5752
}
5853
return LabelInfo{}, false
5954
}
6055

6156
func (c *InvertedLabelsCache) Put(key LabelKey, val LabelInfo) bool {
62-
_, added := c.cache.Insert(key, val, uint64(key.len())+uint64(val.len())+17)
57+
_, added := c.Insert(key, val, uint64(key.len())+uint64(val.len())+17)
6358
return added
6459
}
65-
66-
func (c *InvertedLabelsCache) Reset() {
67-
c.cache.Reset()
68-
}

pkg/pgmodel/cache/series_cache.go

Lines changed: 5 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"math"
1212
"sort"
1313
"sync"
14-
"time"
1514
"unsafe"
1615

1716
"github.com/prometheus/prometheus/model/labels"
@@ -24,10 +23,6 @@ import (
2423
// this seems like a good default size for /active/ series. This results in Promscale using around 360MB on start.
2524
const DefaultSeriesCacheSize = 1000000
2625

27-
const growCheckDuration = time.Second * 5 // check whether to grow the series cache this often
28-
const growFactor = float64(2.0) // multiply cache size by this factor when growing the cache
29-
var evictionMaxAge = time.Minute * 5 // grow cache if we are evicting elements younger than `now - evictionMaxAge`
30-
3126
// SeriesCache is a cache of model.Series entries.
3227
type SeriesCache interface {
3328
Reset()
@@ -38,91 +33,20 @@ type SeriesCache interface {
3833
}
3934

4035
type SeriesCacheImpl struct {
41-
cache *clockcache.Cache
42-
maxSizeBytes uint64
36+
*ResizableCache
4337
}
4438

4539
func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl {
46-
cache := &SeriesCacheImpl{
40+
return &SeriesCacheImpl{NewResizableCache(
4741
clockcache.WithMetrics("series", "metric", config.SeriesCacheInitialSize),
4842
config.SeriesCacheMemoryMaxBytes,
49-
}
50-
51-
if sigClose != nil {
52-
go cache.runSizeCheck(sigClose)
53-
}
54-
return cache
55-
}
56-
57-
func (t *SeriesCacheImpl) runSizeCheck(sigClose <-chan struct{}) {
58-
ticker := time.NewTicker(growCheckDuration)
59-
for {
60-
select {
61-
case <-ticker.C:
62-
if t.shouldGrow() {
63-
t.grow()
64-
}
65-
case <-sigClose:
66-
return
67-
}
68-
}
69-
}
70-
71-
// shouldGrow allows cache growth if we are evicting elements that were recently used or inserted
72-
// evictionMaxAge defines the interval
73-
func (t *SeriesCacheImpl) shouldGrow() bool {
74-
return t.cache.MaxEvictionTs()+int32(evictionMaxAge.Seconds()) > int32(time.Now().Unix())
75-
}
76-
77-
func (t *SeriesCacheImpl) grow() {
78-
sizeBytes := t.cache.SizeBytes()
79-
oldSize := t.cache.Cap()
80-
if float64(sizeBytes)*1.2 >= float64(t.maxSizeBytes) {
81-
log.Warn("msg", "Series cache is too small and cannot be grown",
82-
"current_size_bytes", float64(sizeBytes), "max_size_bytes", float64(t.maxSizeBytes),
83-
"current_size_elements", oldSize, "check_interval", growCheckDuration,
84-
"eviction_max_age", evictionMaxAge)
85-
return
86-
}
87-
88-
multiplier := growFactor
89-
if float64(sizeBytes)*multiplier >= float64(t.maxSizeBytes) {
90-
multiplier = float64(t.maxSizeBytes) / float64(sizeBytes)
91-
}
92-
if multiplier < 1.0 {
93-
return
94-
}
95-
96-
newNumElements := int(float64(oldSize) * multiplier)
97-
log.Info("msg", "Growing the series cache",
98-
"new_size_elements", newNumElements, "current_size_elements", oldSize,
99-
"new_size_bytes", float64(sizeBytes)*multiplier, "max_size_bytes", float64(t.maxSizeBytes),
100-
"multiplier", multiplier,
101-
"eviction_max_age", evictionMaxAge)
102-
t.cache.ExpandTo(newNumElements)
103-
}
104-
105-
func (t *SeriesCacheImpl) Len() int {
106-
return t.cache.Len()
107-
}
108-
109-
func (t *SeriesCacheImpl) Cap() int {
110-
return t.cache.Cap()
111-
}
112-
113-
func (t *SeriesCacheImpl) Evictions() uint64 {
114-
return t.cache.Evictions()
115-
}
116-
117-
// Reset should be concurrency-safe
118-
func (t *SeriesCacheImpl) Reset() {
119-
t.cache.Reset()
43+
sigClose)}
12044
}
12145

12246
// Get the canonical version of a series if one exists.
12347
// input: the string representation of a Labels as defined by generateKey()
12448
func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) {
125-
val, ok := t.cache.Get(str)
49+
val, ok := t.Get(str)
12650
if !ok {
12751
return nil
12852
}
@@ -134,7 +58,7 @@ func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) {
13458
// the even of multiple goroutines setting labels concurrently).
13559
func (t *SeriesCacheImpl) setSeries(str string, lset *model.Series) *model.Series {
13660
//str not counted twice in size since the key and lset.str will point to same thing.
137-
val, inCache := t.cache.Insert(str, lset, lset.FinalSizeBytes())
61+
val, inCache := t.Insert(str, lset, lset.FinalSizeBytes())
13862
if !inCache {
13963
// It seems that cache was full and eviction failed to remove
14064
// element due to starvation caused by a lot of concurrent gets

pkg/pgmodel/cache/series_cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestGrowSeriesCache(t *testing.T) {
8080
cache := NewSeriesCache(Config{SeriesCacheInitialSize: 100, SeriesCacheMemoryMaxBytes: DefaultConfig.SeriesCacheMemoryMaxBytes}, nil)
8181
cacheGrowCounter := 0
8282
for i := 0; i < 200; i++ {
83-
cache.cache.Insert(i, i, 1)
83+
cache.Insert(i, i, 1)
8484
if i%100 == 0 {
8585
time.Sleep(tc.sleep)
8686
}

0 commit comments

Comments
 (0)