Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit f353cbb

Browse files
committed
Fix deadlock when getting series
Previoussly there was a deadlock in get_series_id functions because the get_or_create_metric_table_name call may take an exclusive lock on the series table but the startup of the sql function took an access lock on that same table because SQL functions parse/plan all statements in the function before executing any. Thus, even though get_or_create_metric_table_name was executed before any series table access, the lock was already taken. This lock escalation caused a deadlock because the insert on the metric table has an ON CONFLICT DO NOTHING and thus a waiting process may wait on a winning process to complete while holding an access lock on the series table, but the winning process needs an exclusive lock on the series table. Switching to a PLPGSQL function helps because locks are taken in order of execution and so there is no lock escalation. We also added complete ingest concurrency tests to try to catch this in the future. Finally, we ran benchmark tests and it seems that the switch to PLPGSQL does not significantly degrade performance.
1 parent 795b052 commit f353cbb

File tree

4 files changed

+195
-11
lines changed

4 files changed

+195
-11
lines changed

pkg/pgmodel/end_to_end_tests/concurrent_sql_test.go

Lines changed: 169 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ import (
1010
"testing"
1111

1212
"github.com/jackc/pgx/v4/pgxpool"
13+
"github.com/prometheus/prometheus/prompb"
1314

1415
_ "github.com/jackc/pgx/v4/stdlib"
16+
. "github.com/timescale/timescale-prometheus/pkg/pgmodel"
1517
)
1618

1719
func testConcurrentMetricTable(t testing.TB, db *pgxpool.Pool, metricName string) int64 {
@@ -39,10 +41,10 @@ func testConcurrentNewLabel(t testing.TB, db *pgxpool.Pool, labelName string) in
3941
return *id
4042
}
4143

42-
func testConcurrentCreateSeries(t testing.TB, db *pgxpool.Pool, index int) int64 {
44+
func testConcurrentCreateSeries(t testing.TB, db *pgxpool.Pool, index int, metricName string) int64 {
4345
var id *int64
4446
err := db.QueryRow(context.Background(), "SELECT _prom_catalog.create_series((SELECT id FROM _prom_catalog.metric WHERE metric_name=$3), $1, array[(SELECT id FROM _prom_catalog.label WHERE key = '__name__' AND value=$3), $2::int])",
45-
fmt.Sprintf("metric_%d", index), index, fmt.Sprintf("metric_%d", index)).Scan(&id)
47+
metricName, index, metricName).Scan(&id)
4648
if err != nil {
4749
t.Fatal(err)
4850
}
@@ -52,6 +54,32 @@ func testConcurrentCreateSeries(t testing.TB, db *pgxpool.Pool, index int) int64
5254
return *id
5355
}
5456

57+
func testConcurrentGetSeries(t testing.TB, db *pgxpool.Pool, metricName string) int64 {
58+
var id *int64
59+
err := db.QueryRow(context.Background(), "SELECT _prom_catalog.get_series_id_for_key_value_array($1, array['__name__'], array[$1])",
60+
metricName).Scan(&id)
61+
if err != nil {
62+
t.Fatal(err)
63+
}
64+
if id == nil {
65+
t.Fatalf("NULL found")
66+
}
67+
return *id
68+
}
69+
70+
func testConcurrentGOCMetricTable(t testing.TB, db *pgxpool.Pool, metricName string) int64 {
71+
var id *int64
72+
var name *string
73+
err := db.QueryRow(context.Background(), "SELECT id, table_name FROM _prom_catalog.get_or_create_metric_table_name($1)", metricName).Scan(&id, &name)
74+
if err != nil {
75+
t.Fatal(err)
76+
}
77+
if id == nil || name == nil {
78+
t.Fatalf("NULL found")
79+
}
80+
return *id
81+
}
82+
5583
func TestConcurrentSQL(t *testing.T) {
5684
if testing.Short() {
5785
t.Skip("skipping integration test")
@@ -94,11 +122,41 @@ func TestConcurrentSQL(t *testing.T) {
94122
wg.Add(2)
95123
go func() {
96124
defer wg.Done()
97-
id1 = testConcurrentCreateSeries(t, db, i)
125+
id1 = testConcurrentCreateSeries(t, db, i, fmt.Sprintf("metric_%d", i))
126+
}()
127+
go func() {
128+
defer wg.Done()
129+
id2 = testConcurrentCreateSeries(t, db, i, fmt.Sprintf("metric_%d", i))
130+
}()
131+
wg.Wait()
132+
133+
if id1 != id2 {
134+
t.Fatalf("ids aren't equal: %d != %d", id1, id2)
135+
}
136+
137+
wg.Add(2)
138+
go func() {
139+
defer wg.Done()
140+
id1 = testConcurrentGOCMetricTable(t, db, fmt.Sprintf("goc_metric_%d", i))
141+
}()
142+
go func() {
143+
defer wg.Done()
144+
id2 = testConcurrentGOCMetricTable(t, db, fmt.Sprintf("goc_metric_%d", i))
145+
}()
146+
wg.Wait()
147+
148+
if id1 != id2 {
149+
t.Fatalf("ids aren't equal: %d != %d", id1, id2)
150+
}
151+
152+
wg.Add(2)
153+
go func() {
154+
defer wg.Done()
155+
id1 = testConcurrentGetSeries(t, db, fmt.Sprintf("gs_metric_%d", i))
98156
}()
99157
go func() {
100158
defer wg.Done()
101-
id2 = testConcurrentCreateSeries(t, db, i)
159+
id2 = testConcurrentGetSeries(t, db, fmt.Sprintf("gs_metric_%d", i))
102160
}()
103161
wg.Wait()
104162

@@ -108,3 +166,110 @@ func TestConcurrentSQL(t *testing.T) {
108166
}
109167
})
110168
}
169+
170+
func testConcurrentInsertSimple(t testing.TB, db *pgxpool.Pool) {
171+
metrics := []prompb.TimeSeries{
172+
{
173+
Labels: []prompb.Label{
174+
{Name: MetricNameLabelName, Value: "cpu_usage"},
175+
},
176+
Samples: []prompb.Sample{
177+
{Timestamp: 10, Value: 0.5},
178+
},
179+
},
180+
}
181+
182+
ingestor := NewPgxIngestor(db)
183+
defer ingestor.Close()
184+
_, err := ingestor.Ingest(metrics)
185+
if err != nil {
186+
t.Fatal(err)
187+
}
188+
}
189+
190+
func testConcurrentInsertAdvanced(t testing.TB, db *pgxpool.Pool) {
191+
metrics := []prompb.TimeSeries{
192+
{
193+
Labels: []prompb.Label{
194+
{Name: MetricNameLabelName, Value: "cpu_usage_a"},
195+
{Name: "namespace", Value: "production"},
196+
{Name: "node", Value: "brain"},
197+
},
198+
Samples: []prompb.Sample{
199+
{Timestamp: 10, Value: 0.5},
200+
{Timestamp: 40, Value: 0.6},
201+
},
202+
},
203+
{
204+
Labels: []prompb.Label{
205+
{Name: MetricNameLabelName, Value: "cpu_usage_a"},
206+
{Name: "namespace", Value: "dev"},
207+
{Name: "node", Value: "pinky"},
208+
},
209+
Samples: []prompb.Sample{
210+
{Timestamp: 10, Value: 0.1},
211+
{Timestamp: 45, Value: 0.2},
212+
},
213+
},
214+
{
215+
Labels: []prompb.Label{
216+
{Name: MetricNameLabelName, Value: "cpu_total_a"},
217+
{Name: "namespace", Value: "production"},
218+
{Name: "node", Value: "brain"},
219+
},
220+
Samples: []prompb.Sample{
221+
{Timestamp: 10, Value: 0.5},
222+
{Timestamp: 40, Value: 0.6},
223+
},
224+
},
225+
{
226+
Labels: []prompb.Label{
227+
{Name: MetricNameLabelName, Value: "cpu_total_a"},
228+
{Name: "namespace", Value: "dev"},
229+
{Name: "node", Value: "pinky"},
230+
},
231+
Samples: []prompb.Sample{
232+
{Timestamp: 10, Value: 0.1},
233+
{Timestamp: 45, Value: 0.2},
234+
},
235+
},
236+
}
237+
238+
ingestor := NewPgxIngestor(db)
239+
defer ingestor.Close()
240+
_, err := ingestor.Ingest(metrics)
241+
if err != nil {
242+
t.Fatal(err)
243+
}
244+
}
245+
246+
func TestConcurrentInsert(t *testing.T) {
247+
if testing.Short() {
248+
t.Skip("skipping integration test")
249+
}
250+
251+
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
252+
var wg sync.WaitGroup
253+
wg.Add(2)
254+
go func() {
255+
defer wg.Done()
256+
testConcurrentInsertSimple(t, db)
257+
}()
258+
go func() {
259+
defer wg.Done()
260+
testConcurrentInsertSimple(t, db)
261+
}()
262+
wg.Wait()
263+
264+
wg.Add(2)
265+
go func() {
266+
defer wg.Done()
267+
testConcurrentInsertAdvanced(t, db)
268+
}()
269+
go func() {
270+
defer wg.Done()
271+
testConcurrentInsertAdvanced(t, db)
272+
}()
273+
wg.Wait()
274+
})
275+
}

pkg/pgmodel/end_to_end_tests/sql_bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func createMetricTableName(db *pgxpool.Pool, name string) error {
251251

252252
func getSeriesIDForKeyValueArray(db *pgxpool.Pool, metricName string, keys []string, values []string) error {
253253
var seriesIDKeyVal int
254-
return db.QueryRow(context.Background(), "SELECT _prom_catalog.get_series_id_for_key_value_array($1, $2, $3)", metricName, keys, values).Scan(&seriesIDKeyVal)
254+
return db.QueryRow(context.Background(), "SELECT _prom_catalog.get_series_id_for_key_value_array($1, $2, $3)", metricName, append([]string{"__name__"}, keys...), append([]string{metricName}, values...)).Scan(&seriesIDKeyVal)
255255
}
256256

257257
func generateKeysAndValues(count int, prefix string) ([]string, []string) {

pkg/pgmodel/migrations/migration_files_generated.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/pgmodel/migrations/sql/1_base_schema.up.sql

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -693,8 +693,11 @@ GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.create_series(int, name, SCHEMA_PROM.la
693693

694694
CREATE OR REPLACE FUNCTION SCHEMA_PROM.series_id(label jsonb)
695695
RETURNS BIGINT AS $$
696+
DECLARE
697+
series_id bigint;
698+
BEGIN
696699
--need to make sure the series partition exists
697-
SELECT SCHEMA_CATALOG.get_or_create_metric_table_name(label->>'__name__');
700+
PERFORM SCHEMA_CATALOG.get_or_create_metric_table_name(label->>'__name__');
698701

699702
WITH CTE AS (
700703
SELECT SCHEMA_PROM.label_array(label)
@@ -707,17 +710,29 @@ RETURNS BIGINT AS $$
707710
SELECT SCHEMA_CATALOG.create_series(id, table_name, (SELECT * FROM cte))
708711
FROM SCHEMA_CATALOG.get_or_create_metric_table_name(label->>'__name__')
709712
LIMIT 1
713+
INTO series_id;
714+
715+
RETURN series_id;
716+
END
710717
$$
711-
LANGUAGE SQL VOLATILE;
718+
LANGUAGE PLPGSQL VOLATILE;
712719
COMMENT ON FUNCTION SCHEMA_PROM.series_id(jsonb)
713720
IS 'returns the series id that exactly matches a JSONB of labels';
714721
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.series_id(jsonb) TO prom_writer;
715722

716723
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_series_id_for_key_value_array(metric_name TEXT, label_keys text[], label_values text[])
717724
RETURNS BIGINT AS $$
725+
DECLARE
726+
series_id bigint;
727+
BEGIN
718728
--need to make sure the series partition exists
719-
SELECT SCHEMA_CATALOG.get_or_create_metric_table_name(metric_name);
729+
PERFORM SCHEMA_CATALOG.get_or_create_metric_table_name(metric_name);
720730

731+
--This query MUST take all of its locks after the create metric table above
732+
--since this requires a lower level lock on the series table than the potential
733+
--exclusive lock on series when adding the series partition. This is why this
734+
--is a PLPGSQL function and not a SQL Function (a SQL function parses/plans all
735+
--statement during startup, which would take a lock on the series table)
721736
WITH CTE AS (
722737
SELECT SCHEMA_PROM.label_array(metric_name, label_keys, label_values)
723738
)
@@ -729,8 +744,12 @@ RETURNS BIGINT AS $$
729744
SELECT SCHEMA_CATALOG.create_series(id, table_name, (SELECT * FROM cte))
730745
FROM SCHEMA_CATALOG.get_or_create_metric_table_name(metric_name)
731746
LIMIT 1
747+
INTO series_id;
748+
749+
RETURN series_id;
750+
END
732751
$$
733-
LANGUAGE SQL VOLATILE;
752+
LANGUAGE PLPGSQL VOLATILE;
734753
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_series_id_for_key_value_array(TEXT, text[], text[]) TO prom_writer;
735754
--
736755
-- Parameter manipulation functions

0 commit comments

Comments
 (0)