Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 79aee2c

Browse files
committed
Ignore duplicate data points
Prometheus does not allow data points with duplicate (timestamp, series id). To replicate this, this commit adds a UNIQUE index to each of the timestamp data tables, and adds a fallback path so that COPY FROMs that fail due to duplicate keys are retried as INSERT ... ON CONFLICT DO NOTHINGs
1 parent 293676a commit 79aee2c

File tree

5 files changed

+213
-36
lines changed

5 files changed

+213
-36
lines changed

pkg/log/log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616

1717
var (
1818
// Application wide logger
19-
logger log.Logger
19+
logger log.Logger = log.NewNopLogger()
2020

2121
// logger timestamp format
2222
timestampFormat = log.TimestampFormat(

pkg/pgmodel/end_to_end_tests/create_test.go

Lines changed: 114 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ package end_to_end_tests
66
import (
77
"context"
88
"fmt"
9+
"reflect"
910
"testing"
1011
"time"
1112

1213
"github.com/jackc/pgconn"
14+
"github.com/jackc/pgerrcode"
1315
pgx "github.com/jackc/pgx/v4"
1416
"github.com/jackc/pgx/v4/pgxpool"
1517
"github.com/timescale/timescale-prometheus/pkg/prompb"
@@ -316,7 +318,7 @@ func TestSQLIngest(t *testing.T) {
316318
},
317319
},
318320
},
319-
count: 2,
321+
count: 1,
320322
countSeries: 1,
321323
},
322324
{
@@ -404,8 +406,12 @@ func TestSQLIngest(t *testing.T) {
404406
t.Fatalf("got an unexpected error %v", err)
405407
}
406408

407-
if cnt != tcase.count {
408-
t.Fatalf("counts not equal: got %v expected %v\n", cnt, tcase.count)
409+
// our reporting of inserts is necessarily inexact due to
410+
// duplicates being dropped we report the number of samples the
411+
// ingestor handled, not necissarily how many were inserted
412+
// into the DB
413+
if cnt < tcase.count {
414+
t.Fatalf("incorrect counts: got %v expected %v\n", cnt, tcase.count)
409415
}
410416

411417
if err != nil {
@@ -430,8 +436,8 @@ func TestSQLIngest(t *testing.T) {
430436
totalRows += rowsInTable
431437
}
432438

433-
if totalRows != int(cnt) {
434-
t.Fatalf("counts not equal: got %v expected %v\n", totalRows, cnt)
439+
if totalRows != int(tcase.count) {
440+
t.Fatalf("counts not equal: got %v expected %v\n", totalRows, tcase.count)
435441
}
436442

437443
err = ingestor.CompleteMetricCreation()
@@ -452,6 +458,109 @@ func TestSQLIngest(t *testing.T) {
452458
}
453459
}
454460

461+
func TestInsertCompressedDuplicates(t *testing.T) {
462+
if testing.Short() {
463+
t.Skip("skipping integration test")
464+
}
465+
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
466+
ts := []prompb.TimeSeries{
467+
{
468+
Labels: []prompb.Label{
469+
{Name: MetricNameLabelName, Value: "test"},
470+
{Name: "test", Value: "test"},
471+
},
472+
Samples: []prompb.Sample{
473+
{Timestamp: 10000000, Value: 1.0},
474+
},
475+
},
476+
}
477+
ingestor, err := NewPgxIngestor(db)
478+
if err != nil {
479+
t.Fatal(err)
480+
}
481+
defer ingestor.Close()
482+
_, err = ingestor.Ingest(ts, NewWriteRequest())
483+
if err != nil {
484+
t.Fatal(err)
485+
}
486+
err = ingestor.CompleteMetricCreation()
487+
if err != nil {
488+
t.Fatal(err)
489+
}
490+
491+
_, err = db.Exec(context.Background(), "SELECT compress_chunk(i) from show_chunks('prom_data.test') i;")
492+
493+
if err != nil {
494+
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.SQLState() == pgerrcode.DuplicateObject {
495+
//already compressed (could happen if policy already ran). This is fine
496+
} else {
497+
t.Fatal(err)
498+
}
499+
}
500+
501+
ts = []prompb.TimeSeries{
502+
{
503+
Labels: []prompb.Label{
504+
{Name: MetricNameLabelName, Value: "test"},
505+
{Name: "test", Value: "test"},
506+
},
507+
Samples: []prompb.Sample{
508+
{Timestamp: 1, Value: 0.0},
509+
},
510+
},
511+
}
512+
513+
_, err = ingestor.Ingest(ts, NewWriteRequest())
514+
if err != nil {
515+
t.Fatal(err)
516+
}
517+
err = ingestor.CompleteMetricCreation()
518+
if err != nil {
519+
t.Fatal(err)
520+
}
521+
522+
ts = []prompb.TimeSeries{
523+
{
524+
Labels: []prompb.Label{
525+
{Name: MetricNameLabelName, Value: "test"},
526+
{Name: "test", Value: "test"},
527+
},
528+
Samples: []prompb.Sample{
529+
{Timestamp: 1, Value: 0.2},
530+
{Timestamp: 10, Value: 3.0},
531+
{Timestamp: 10000000, Value: 4.0},
532+
{Timestamp: 10000001, Value: 5.0},
533+
},
534+
},
535+
}
536+
537+
//ingest duplicate after compression
538+
_, err = ingestor.Ingest(ts, NewWriteRequest())
539+
if err != nil {
540+
t.Fatal(err)
541+
}
542+
543+
rows, err := db.Query(context.Background(), "SELECT value FROM prom_data.test ORDER BY time")
544+
if err != nil {
545+
t.Fatal(err)
546+
}
547+
548+
expected := []float64{0.0, 3.0, 1.0, 5.0}
549+
found := make([]float64, 0, 4)
550+
for rows.Next() {
551+
var value float64
552+
err = rows.Scan(&value)
553+
if err != nil {
554+
t.Fatal(err)
555+
}
556+
found = append(found, value)
557+
}
558+
if !reflect.DeepEqual(expected, found) {
559+
t.Errorf("wrong values in DB\nexpected:\n\t%v\ngot:\n\t%v", expected, found)
560+
}
561+
})
562+
}
563+
455564
func TestInsertCompressed(t *testing.T) {
456565
if testing.Short() {
457566
t.Skip("skipping integration test")

pkg/pgmodel/migrations/migration_files_generated.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/pgmodel/migrations/sql/1_base_schema.up.sql

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ DECLARE
266266
BEGIN
267267
EXECUTE format('CREATE TABLE SCHEMA_DATA.%I(time TIMESTAMPTZ NOT NULL, value DOUBLE PRECISION, series_id INT NOT NULL)',
268268
NEW.table_name);
269-
EXECUTE format('CREATE INDEX data_series_id_time_%s ON SCHEMA_DATA.%I (series_id, time) INCLUDE (value)',
269+
EXECUTE format('CREATE UNIQUE INDEX data_series_id_time_%s ON SCHEMA_DATA.%I (series_id, time) INCLUDE (value)',
270270
NEW.id, NEW.table_name);
271271
PERFORM create_hypertable(format('SCHEMA_DATA.%I', NEW.table_name), 'time',
272272
chunk_time_interval=>SCHEMA_CATALOG.get_default_chunk_interval(),
@@ -803,10 +803,10 @@ LANGUAGE PLPGSQL VOLATILE;
803803
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.create_series(int, name, SCHEMA_PROM.label_array) TO prom_writer;
804804

805805
-- There shouldn't be a need to have a read only version of this as we'll use
806-
-- the eq or other matcher functions to find series ids like this. However,
806+
-- the eq or other matcher functions to find series ids like this. However,
807807
-- there are possible use cases that need the series id directly for performance
808-
-- that we might want to see if we need to support, in which case a
809-
-- read only version might be useful in future.
808+
-- that we might want to see if we need to support, in which case a
809+
-- read only version might be useful in future.
810810
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_series_id(label jsonb)
811811
RETURNS BIGINT AS $$
812812
DECLARE
@@ -1537,4 +1537,4 @@ BEGIN
15371537
COMMIT;
15381538
END LOOP;
15391539
END;
1540-
$$ LANGUAGE PLPGSQL;
1540+
$$ LANGUAGE PLPGSQL;

pkg/pgmodel/pgx.go

Lines changed: 91 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -643,37 +643,105 @@ func runCopyFrom(conn pgxConn, in chan copyRequest) {
643643
if !ok {
644644
return
645645
}
646-
_, err := conn.CopyFrom(
647-
context.Background(),
648-
pgx.Identifier{dataSchema, req.table},
649-
copyColumns,
650-
&req.data.batch,
651-
)
646+
err := doCopyFrom(conn, req)
652647
if err != nil {
653-
if pgErr, ok := err.(*pgconn.PgError); ok && strings.Contains(pgErr.Message, "insert/update/delete not permitted") {
654-
/* If the error was that the table is already compressed, decompress and try again. */
655-
decompressErr := decompressChunks(conn, req.data, req.table)
656-
if decompressErr != nil {
657-
req.data.reportResults(err)
658-
pendingBuffers.Put(req.data)
659-
continue
660-
}
661-
662-
req.data.batch.ResetPosition()
663-
_, err = conn.CopyFrom(
664-
context.Background(),
665-
pgx.Identifier{dataSchema, req.table},
666-
copyColumns,
667-
&req.data.batch,
668-
)
669-
}
648+
err = copyFromErrorFallback(conn, req, err)
670649
}
671650

672651
req.data.reportResults(err)
673652
pendingBuffers.Put(req.data)
674653
}
675654
}
676655

656+
// certain errors are recoverable, handle those we can
657+
// 1. if the data contains duplicates, switch to INSERT ... ON CONFLICT DO NOTHING
658+
// so the duplicates are dropped and the other data can go in
659+
// 2. if the table is compressed, decompress and retry the insertion
660+
func copyFromErrorFallback(conn pgxConn, req copyRequest, err error) error {
661+
// we use either COPY FROM or INSERT depending on if the dataset has duplicates
662+
// if we do need to use INSERT, we need to keep doing so, otherwise we'll
663+
// fail due to duplicates.
664+
insertFn := doCopyFrom
665+
for i := 0; err != nil && i < 10; i++ {
666+
insertFn, err = tryRecovery(conn, req, insertFn, err)
667+
if insertFn == nil || err != nil {
668+
return err
669+
}
670+
err = insertFn(conn, req)
671+
}
672+
if err != nil {
673+
log.Warn("msg", fmt.Sprintf("time out while processing error for %s", req.table), "error", err.Error())
674+
}
675+
return err
676+
}
677+
678+
type insertionFunction = func(conn pgxConn, req copyRequest) error
679+
680+
// we can currently recover from two error:
681+
// If we inserted duplicate data we switch to using INSERT ... ON CONFLICT DO NOTHING
682+
// to skip redundant data points and try again.
683+
// If we inserted into a compressed chunk, we decompress the chunk and try again.
684+
// Since a single batch can have both errors, we need to remember the insert method
685+
// we're using, so that we deduplicate if needed.
686+
func tryRecovery(conn pgxConn, req copyRequest, insertFn insertionFunction, err error) (insertionFunction, error) {
687+
688+
// we only recover from postgres errors right now
689+
pgErr, ok := err.(*pgconn.PgError)
690+
if !ok {
691+
errMsg := err.Error()
692+
log.Warn("msg", fmt.Sprintf("unexpected error while inserting to %s", req.table), "error", errMsg)
693+
return nil, err
694+
}
695+
696+
// if we have duplicate data points try again using
697+
// INSERT ... ON CONFLICT DO NOTHING to insert only the new ones
698+
if pgErr.Code == pgerrcode.UniqueViolation {
699+
log.Warn("msg", fmt.Sprintf("duplicate data in sample for %s", req.table), "table", req.table)
700+
req.data.batch.ResetPosition()
701+
return doInsert, nil
702+
}
703+
704+
// If the error was that the table is already compressed, decompress and try again.
705+
if strings.Contains(pgErr.Message, "insert/update/delete not permitted") {
706+
decompressErr := decompressChunks(conn, req.data, req.table)
707+
if decompressErr != nil {
708+
return nil, err
709+
}
710+
711+
req.data.batch.ResetPosition()
712+
// use the old insertFn in case it was doing deduplication
713+
return insertFn, nil
714+
}
715+
716+
log.Warn("msg", fmt.Sprintf("unexpected postgres error while inserting to %s", req.table), "error", pgErr.Error())
717+
return nil, err
718+
}
719+
720+
func doCopyFrom(conn pgxConn, req copyRequest) error {
721+
_, err := conn.CopyFrom(
722+
context.Background(),
723+
pgx.Identifier{dataSchema, req.table},
724+
copyColumns,
725+
&req.data.batch,
726+
)
727+
return err
728+
}
729+
730+
func doInsert(conn pgxConn, req copyRequest) error {
731+
queryString := fmt.Sprintf("INSERT INTO %s.%s(time, value, series_id) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", dataSchema, req.table)
732+
insertBatch := conn.NewBatch()
733+
for req.data.batch.Next() {
734+
values, _ := req.data.batch.Values()
735+
insertBatch.Queue(queryString, values...)
736+
}
737+
resp, err := conn.SendBatch(context.Background(), insertBatch)
738+
err2 := resp.Close()
739+
if err == nil {
740+
err = err2
741+
}
742+
return err
743+
}
744+
677745
func decompressChunks(conn pgxConn, pending *pendingBuffer, table string) error {
678746
minTime := model.Time(pending.batch.minSeen).Time()
679747

0 commit comments

Comments
 (0)