Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 093802c

Browse files
committed
Decompress automatically if trying to insert to a compressed chunk
This may happen if prometheus is replaying old data.
1 parent e70f584 commit 093802c

File tree

4 files changed

+167
-4
lines changed

4 files changed

+167
-4
lines changed

pkg/pgmodel/end_to_end_tests/create_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/jackc/pgconn"
13+
pgx "github.com/jackc/pgx/v4"
1214
"github.com/jackc/pgx/v4/pgxpool"
1315
"github.com/timescale/timescale-prometheus/pkg/prompb"
1416

@@ -448,3 +450,57 @@ func TestSQLIngest(t *testing.T) {
448450
})
449451
}
450452
}
453+
454+
func TestInsertCompressed(t *testing.T) {
455+
if testing.Short() {
456+
t.Skip("skipping integration test")
457+
}
458+
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
459+
ts := []prompb.TimeSeries{
460+
{
461+
Labels: []prompb.Label{
462+
{Name: MetricNameLabelName, Value: "test"},
463+
{Name: "test", Value: "test"},
464+
},
465+
Samples: []prompb.Sample{
466+
{Timestamp: 1, Value: 0.1},
467+
},
468+
},
469+
}
470+
ingestor, err := NewPgxIngestor(db)
471+
if err != nil {
472+
t.Fatal(err)
473+
}
474+
defer ingestor.Close()
475+
_, err = ingestor.Ingest(ts, NewWriteRequest())
476+
if err != nil {
477+
t.Fatal(err)
478+
}
479+
err = ingestor.CompleteMetricCreation()
480+
if err != nil {
481+
t.Fatal(err)
482+
}
483+
484+
_, err = db.Exec(context.Background(), "SELECT compress_chunk(i) from show_chunks('prom_data.test') i;")
485+
if err != nil {
486+
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.SQLState() == "42710" {
487+
//already compressed (could happen if policy already ran). This is fine
488+
} else {
489+
t.Fatal(err)
490+
}
491+
}
492+
//ingest after compression
493+
_, err = ingestor.Ingest(ts, NewWriteRequest())
494+
if err != nil {
495+
t.Fatal(err)
496+
}
497+
var nextStartAfter time.Time
498+
err = db.QueryRow(context.Background(), "SELECT next_start FROM timescaledb_information.policy_stats WHERE hypertable = $1::text::regclass", pgx.Identifier{"prom_data", "test"}.Sanitize()).Scan(&nextStartAfter)
499+
if err != nil {
500+
t.Fatal(err)
501+
}
502+
if time.Until(nextStartAfter) < time.Hour*10 {
503+
t.Error("next_start was not changed enough")
504+
}
505+
})
506+
}

pkg/pgmodel/migrations/migration_files_generated.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/pgmodel/migrations/sql/1_base_schema.up.sql

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,3 +1471,41 @@ CREATE VIEW SCHEMA_INFO.label AS
14711471
ARRAY(SELECT value FROM SCHEMA_CATALOG.label l WHERE l.key = lk.key ORDER BY value)
14721472
AS values
14731473
FROM SCHEMA_CATALOG.label_key lk;
1474+
1475+
1476+
1477+
1478+
--Decompression should take place in a procedure because we don't want locks held across
1479+
--decompress_chunk calls since that function takes some heavier locks at the end.
1480+
CREATE PROCEDURE SCHEMA_CATALOG.decompress_chunks_after(metric_table NAME, min_time TIMESTAMPTZ)
1481+
AS $$
1482+
DECLARE
1483+
chunk_row _timescaledb_catalog.chunk;
1484+
dimension_row _timescaledb_catalog.dimension;
1485+
hypertable_row _timescaledb_catalog.hypertable;
1486+
min_time_internal bigint;
1487+
BEGIN
1488+
SELECT h.* INTO STRICT hypertable_row FROM _timescaledb_catalog.hypertable h
1489+
WHERE table_name = metric_table AND schema_name = 'SCHEMA_DATA';
1490+
1491+
SELECT d.* INTO STRICT dimension_row FROM _timescaledb_catalog.dimension d WHERE hypertable_id = hypertable_row.id ORDER BY id LIMIT 1;
1492+
1493+
SELECT _timescaledb_internal.time_to_internal(min_time) INTO STRICT min_time_internal;
1494+
1495+
FOR chunk_row IN
1496+
SELECT c.*
1497+
FROM _timescaledb_catalog.dimension_slice ds
1498+
INNER JOIN _timescaledb_catalog.chunk_constraint cc ON cc.dimension_slice_id = ds.id
1499+
INNER JOIN _timescaledb_catalog.chunk c ON cc.chunk_id = c.id
1500+
WHERE dimension_id = dimension_row.id
1501+
-- the range_starts are inclusive
1502+
AND min_time_internal >= ds.range_start
1503+
AND c.compressed_chunk_id IS NOT NULL
1504+
ORDER BY ds.range_start
1505+
LOOP
1506+
RAISE NOTICE 'Timescale-Prometheus is decompressing chunk: %.%', chunk_row.schema_name, chunk_row.table_name;
1507+
PERFORM decompress_chunk(format('%I.%I', chunk_row.schema_name, chunk_row.table_name)::regclass);
1508+
COMMIT;
1509+
END LOOP;
1510+
END;
1511+
$$ LANGUAGE PLPGSQL;

pkg/pgmodel/pgx.go

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ package pgmodel
77
import (
88
"context"
99
"fmt"
10+
"math"
1011
"sort"
12+
"strings"
1113
"sync"
1214
"sync/atomic"
1315
"time"
@@ -116,18 +118,28 @@ type SampleInfoIterator struct {
116118
sampleInfos []samplesInfo
117119
sampleInfoIndex int
118120
sampleIndex int
121+
minSeen int64
119122
}
120123

121124
// NewSampleInfoIterator is the constructor
122125
func NewSampleInfoIterator() SampleInfoIterator {
123-
return SampleInfoIterator{sampleInfos: make([]samplesInfo, 0), sampleIndex: -1, sampleInfoIndex: 0}
126+
si := SampleInfoIterator{sampleInfos: make([]samplesInfo, 0)}
127+
si.ResetPosition()
128+
return si
124129
}
125130

126131
//Append adds a sample info to the back of the iterator
127132
func (t *SampleInfoIterator) Append(s samplesInfo) {
128133
t.sampleInfos = append(t.sampleInfos, s)
129134
}
130135

136+
//ResetPosition resets the iteration position to the beginning
137+
func (t *SampleInfoIterator) ResetPosition() {
138+
t.sampleIndex = -1
139+
t.sampleInfoIndex = 0
140+
t.minSeen = math.MaxInt64
141+
}
142+
131143
// Next returns true if there is another row and makes the next row data
132144
// available to Values(). When there are no more rows available or an error
133145
// has occurred it returns false.
@@ -149,6 +161,9 @@ func (t *SampleInfoIterator) Values() ([]interface{}, error) {
149161
sample.Value,
150162
info.seriesID,
151163
}
164+
if t.minSeen > sample.Timestamp {
165+
t.minSeen = sample.Timestamp
166+
}
152167
return row, nil
153168
}
154169

@@ -390,6 +405,7 @@ type insertHandler struct {
390405
pending *pendingBuffer
391406
seriesCache map[string]SeriesID
392407
metricTableName string
408+
metricName string
393409
}
394410

395411
type pendingBuffer struct {
@@ -480,6 +496,7 @@ func runInserterRoutine(conn pgxConn, input chan insertDataRequest, metricName s
480496
pending: &pendingBuffer{make([]insertDataTask, 0), NewSampleInfoIterator()},
481497
seriesCache: make(map[string]SeriesID),
482498
metricTableName: tableName,
499+
metricName: metricName,
483500
}
484501

485502
for {
@@ -560,6 +577,40 @@ func (h *insertHandler) flush() {
560577
h.flushPending(h.pending)
561578
}
562579

580+
func (h *insertHandler) decompressChunks(pending *pendingBuffer) error {
581+
log.Warn("msg", fmt.Sprintf("Table %s was compressed, decompressing", h.metricTableName), "table", h.metricTableName, "metric", h.metricName)
582+
minTime := model.Time(pending.batch.minSeen).Time()
583+
584+
//how much faster are we at ingestion than wall-clock time?
585+
ingestSpeedup := 2
586+
//delay the next compression job proportional to the duration between now and the data time + a constant safety
587+
delayBy := (time.Since(minTime) / time.Duration(ingestSpeedup)) + time.Duration(60*time.Minute)
588+
maxDelayBy := time.Hour * 24
589+
if delayBy > maxDelayBy {
590+
delayBy = maxDelayBy
591+
}
592+
593+
_, rescheduleErr := h.conn.Exec(context.Background(),
594+
`SELECT alter_job_schedule(
595+
(SELECT job_id
596+
FROM _timescaledb_config.bgw_policy_compress_chunks p
597+
INNER JOIN _timescaledb_catalog.hypertable h ON (h.id = p.hypertable_id)
598+
WHERE h.schema_name = $1 and h.table_name = $2),
599+
next_start=>$3)`, dataSchema, h.metricTableName, time.Now().Add(delayBy))
600+
if rescheduleErr != nil {
601+
log.Error("msg", rescheduleErr, "context", "Rescheduling compression")
602+
return rescheduleErr
603+
}
604+
605+
_, decompressErr := h.conn.Exec(context.Background(), "CALL "+catalogSchema+".decompress_chunks_after($1, $2);", h.metricTableName, minTime)
606+
if decompressErr != nil {
607+
log.Error("msg", decompressErr, "context", "Decompressing chunks")
608+
return decompressErr
609+
}
610+
611+
return nil
612+
}
613+
563614
func (h *insertHandler) flushPending(pending *pendingBuffer) {
564615
err := func() error {
565616
_, err := h.setSeriesIds(pending.batch.sampleInfos)
@@ -573,6 +624,23 @@ func (h *insertHandler) flushPending(pending *pendingBuffer) {
573624
copyColumns,
574625
&pending.batch,
575626
)
627+
if err != nil {
628+
if pgErr, ok := err.(*pgconn.PgError); ok && strings.Contains(pgErr.Message, "insert/update/delete not permitted") {
629+
/* If the error was that the table is already compressed, decompress and try again. */
630+
decompressErr := h.decompressChunks(pending)
631+
if decompressErr != nil {
632+
return fmt.Errorf("Error while decompressing. Decompression error: %v, Original Error: %w", decompressErr, err)
633+
}
634+
635+
pending.batch.ResetPosition()
636+
_, err = h.conn.CopyFrom(
637+
context.Background(),
638+
pgx.Identifier{dataSchema, h.metricTableName},
639+
copyColumns,
640+
&pending.batch,
641+
)
642+
}
643+
}
576644
return err
577645
}()
578646

@@ -592,7 +660,8 @@ func (h *insertHandler) flushPending(pending *pendingBuffer) {
592660
// nil all pointers to prevent memory leaks
593661
pending.batch.sampleInfos[i] = samplesInfo{}
594662
}
595-
pending.batch = SampleInfoIterator{sampleInfos: pending.batch.sampleInfos[:0], sampleIndex: -1, sampleInfoIndex: 0}
663+
pending.batch = SampleInfoIterator{sampleInfos: pending.batch.sampleInfos[:0]}
664+
pending.batch.ResetPosition()
596665
}
597666

598667
func (h *insertHandler) setSeriesIds(sampleInfos []samplesInfo) (string, error) {

0 commit comments

Comments
 (0)