Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 645fcb9

Browse files
committed
Reset inverted labels cache on epoch change
The database cache epoch advances when series are deleted from the database. When the connector detects a cache epoch change, it invalidates the series cache, as it may contain values which reference the deleted rows in the series table. During the database series deletion, unused labels may also be deleted. This means that we must also invalidate the inverted label cache, as it could also reference deleted rows in the `_prom_catalog.label` table.
1 parent 9b27bd4 commit 645fcb9

File tree

5 files changed

+35
-8
lines changed

5 files changed

+35
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ We use the following categories for changes:
2525

2626
### Fixed
2727
- Make Jaeger Event queryable using name and tags [#1553]
28+
- Reset inverted labels cache on epoch change [#1561]
2829

2930
## [0.13.0] - 2022-07-20
3031

pkg/pgmodel/Readme.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ TODO (@ante do you want to write this?)
1515

1616
## Write Path
1717

18-
A `WriteRequest` is made out of a `[]TimeSeries` each of which contains a set of lables, each a `{string, string}`, and a set of samples, each a `{Timestamp, Value}`. Logically, the write path deals with this write request multiple stages. In Go notation, this can be thought of as follows
18+
A `WriteRequest` is made out of a `[]TimeSeries` each of which contains a set of labels, each a `{string, string}`, and a set of samples, each a `{Timestamp, Value}`. Logically, the write path deals with this write request multiple stages. In Go notation, this can be thought of as follows
1919

2020
```go
2121
type WriteRequest = []TimeSeries

pkg/pgmodel/cache/inverted_labels_cache.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type InvertedLabelsCache struct {
4242
}
4343

4444
// Cache is thread-safe
45-
func NewInvertedLablesCache(size uint64) (*InvertedLabelsCache, error) {
45+
func NewInvertedLabelsCache(size uint64) (*InvertedLabelsCache, error) {
4646
if size <= 0 {
4747
return nil, fmt.Errorf("labels cache size must be > 0")
4848
}
@@ -62,3 +62,7 @@ func (c *InvertedLabelsCache) Put(key LabelKey, val LabelInfo) bool {
6262
_, added := c.cache.Insert(key, val, uint64(key.len())+uint64(val.len())+17)
6363
return added
6464
}
65+
66+
func (c *InvertedLabelsCache) Reset() {
67+
c.cache.Reset()
68+
}

pkg/pgmodel/ingestor/dispatcher.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type pgxDispatcher struct {
3838
conn pgxconn.PgxConn
3939
metricTableNames cache.MetricCache
4040
scache cache.SeriesCache
41+
invertedLabelsCache *cache.InvertedLabelsCache
4142
exemplarKeyPosCache cache.PositionCache
4243
batchers sync.Map
4344
completeMetricCreation chan struct{}
@@ -76,7 +77,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
7677
}
7778

7879
labelArrayOID := model.GetCustomTypeOID(model.LabelArray)
79-
labelsCache, err := cache.NewInvertedLablesCache(cfg.InvertedLabelsCacheSize)
80+
labelsCache, err := cache.NewInvertedLabelsCache(cfg.InvertedLabelsCacheSize)
8081
if err != nil {
8182
return nil, err
8283
}
@@ -91,6 +92,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
9192
conn: conn,
9293
metricTableNames: mCache,
9394
scache: scache,
95+
invertedLabelsCache: labelsCache,
9496
exemplarKeyPosCache: eCache,
9597
completeMetricCreation: make(chan struct{}, 1),
9698
asyncAcks: cfg.AsyncAcks,
@@ -156,10 +158,14 @@ func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (mod
156158
if err != nil {
157159
// Trash the cache just in case an epoch change occurred, seems safer
158160
p.scache.Reset()
161+
// Also trash the inverted labels cache, which can also be invalidated when the series cache is
162+
p.invertedLabelsCache.Reset()
159163
return model.InvalidSeriesEpoch, err
160164
}
161165
if existingEpoch == model.InvalidSeriesEpoch || dbEpoch != existingEpoch {
162166
p.scache.Reset()
167+
// If the series cache needs to be invalidated, so does the inverted labels cache
168+
p.invertedLabelsCache.Reset()
163169
}
164170
return dbEpoch, nil
165171
}

pkg/pgmodel/ingestor/ingestor_sql_test.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func TestPGXInserterInsertSeries(t *testing.T) {
276276
mock := model.NewSqlRecorder(c.sqlQueries, t)
277277
scache := cache.NewSeriesCache(cache.DefaultConfig, nil)
278278
scache.Reset()
279-
lCache, _ := cache.NewInvertedLablesCache(10)
279+
lCache, _ := cache.NewInvertedLabelsCache(10)
280280
sw := NewSeriesWriter(mock, 0, lCache)
281281

282282
lsi := make([]model.Insertable, 0)
@@ -393,6 +393,21 @@ func TestPGXInserterCacheReset(t *testing.T) {
393393
},
394394
{Sql: "COMMIT;"},
395395
{Sql: "BEGIN;"},
396+
{
397+
Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)",
398+
Args: []interface{}{
399+
"metric_1",
400+
tableName,
401+
[]string{"__name__", "name_1", "name_1"},
402+
[]string{"metric_1", "value_1", "value_2"},
403+
},
404+
Results: model.RowResults{
405+
{[]int32{1, 2, 2}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_1"}, []string{"metric_1", "value_1", "value_2"}},
406+
},
407+
Err: error(nil),
408+
},
409+
{Sql: "COMMIT;"},
410+
{Sql: "BEGIN;"},
396411
{
397412
Sql: seriesInsertSQL,
398413
Args: []interface{}{
@@ -419,11 +434,12 @@ func TestPGXInserterCacheReset(t *testing.T) {
419434

420435
mock := model.NewSqlRecorder(sqlQueries, t)
421436
scache := cache.NewSeriesCache(cache.DefaultConfig, nil)
422-
lcache, _ := cache.NewInvertedLablesCache(10)
437+
lcache, _ := cache.NewInvertedLabelsCache(10)
423438
sw := NewSeriesWriter(mock, 0, lcache)
424439
inserter := pgxDispatcher{
425-
conn: mock,
426-
scache: scache,
440+
conn: mock,
441+
scache: scache,
442+
invertedLabelsCache: lcache,
427443
}
428444

429445
makeSamples := func(series []labels.Labels) []model.Insertable {
@@ -460,7 +476,7 @@ func TestPGXInserterCacheReset(t *testing.T) {
460476
}
461477
}
462478

463-
// refreshing during the same epoch givesthe same IDs without checking the DB
479+
// refreshing during the same epoch gives the same IDs without checking the DB
464480
_, err = inserter.refreshSeriesEpoch(1)
465481
require.NoError(t, err)
466482

0 commit comments

Comments
 (0)