Skip to content

Commit 26a2c50

Browse files
committed
Relax lock when processing CAgg invalidation logs
In timescale#8515 we made some improvements on Concurrent CAgg Refreshes but an oversight happened keeping the `ExclusiveLock` in the materialization hypertable during the CAgg invalidation logs processing ending up with concurrent refreshes on non-overlaping ranges waiting for each other. Fixed it by relaxing the `ExclusiveLock` to `ShareUpdateExclusiveLock` since we should only guarantee that the CAgg invalidation logs processing transaction be executed serially but the next transaction (materialization phase) can't be blocked by this or block other concurrent refreshes.
1 parent 9a47b63 commit 26a2c50

File tree

3 files changed

+129
-34
lines changed

3 files changed

+129
-34
lines changed

tsl/src/continuous_aggs/refresh.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,12 +697,15 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
697697
* This is supposed to be a short transaction and in the future we can consider
698698
* relaxing this lock.
699699
*/
700-
LockRelationOid(hyper_relid, ExclusiveLock);
700+
LockRelationOid(hyper_relid, ShareUpdateExclusiveLock);
701701
invalidations = invalidation_process_cagg_log(cagg,
702702
refresh_window,
703703
ts_guc_cagg_max_individual_materializations,
704704
context,
705705
force);
706+
707+
DEBUG_WAITPOINT("before_process_cagg_invalidations_for_refresh_lock");
708+
706709
SPI_commit_and_chain();
707710

708711
DEBUG_WAITPOINT("after_process_cagg_invalidations_for_refresh_lock");
@@ -957,6 +960,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
957960
if (!refreshed && !has_pending_materializations)
958961
emit_up_to_date_notice(cagg, context);
959962

963+
DEBUG_WAITPOINT("after_process_cagg_materializations");
964+
960965
/* Restore search_path */
961966
AtEOXact_GUC(false, save_nestlevel);
962967

tsl/test/isolation/expected/cagg_concurrent_refresh.out

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Parsed test spec with 14 sessions
1+
Parsed test spec with 16 sessions
22

33
starting permutation: R1_refresh S1_select R3_refresh S1_select L2_read_unlock_threshold_table L3_unlock_cagg_table L1_unlock_threshold_table
44
step R1_refresh:
@@ -544,7 +544,7 @@ step R12_refresh:
544544
CALL refresh_continuous_aggregate('cond2_10', 25, 70);
545545

546546

547-
starting permutation: WP_enable R1_refresh R6_pending_materialization_ranges R5_refresh R6_pending_materialization_ranges WP_release R6_pending_materialization_ranges S1_select
547+
starting permutation: WP_after_enable R1_refresh R6_pending_materialization_ranges R5_refresh R6_pending_materialization_ranges WP_after_release R6_pending_materialization_ranges S1_select
548548
R5: LOG: statement:
549549
SET SESSION lock_timeout = '500ms';
550550
SET SESSION deadlock_timeout = '500ms';
@@ -553,7 +553,7 @@ R5: LOG: statement:
553553
L1: WARNING: there is already a transaction in progress
554554
L2: WARNING: there is already a transaction in progress
555555
L3: WARNING: there is already a transaction in progress
556-
step WP_enable:
556+
step WP_after_enable:
557557
SELECT debug_waitpoint_enable('after_process_cagg_invalidations_for_refresh_lock');
558558

559559
debug_waitpoint_enable
@@ -587,7 +587,7 @@ cond_10 | 30| 70
587587
cond_10 | 70| 100
588588
(2 rows)
589589

590-
step WP_release:
590+
step WP_after_release:
591591
SELECT debug_waitpoint_release('after_process_cagg_invalidations_for_refresh_lock');
592592

593593
debug_waitpoint_release
@@ -645,7 +645,7 @@ conditions2|-2147483648
645645
(2 rows)
646646

647647

648-
starting permutation: WP_enable R6_pending_materialization_ranges R1_refresh R3_refresh K1_killpid R6_pending_materialization_ranges WP_release R13_refresh R6_pending_materialization_ranges
648+
starting permutation: WP_after_enable R6_pending_materialization_ranges R1_refresh R3_refresh K1_cancelpid R6_pending_materialization_ranges WP_after_release R13_refresh R6_pending_materialization_ranges
649649
R5: LOG: statement:
650650
SET SESSION lock_timeout = '500ms';
651651
SET SESSION deadlock_timeout = '500ms';
@@ -654,7 +654,7 @@ R5: LOG: statement:
654654
L1: WARNING: there is already a transaction in progress
655655
L2: WARNING: there is already a transaction in progress
656656
L3: WARNING: there is already a transaction in progress
657-
step WP_enable:
657+
step WP_after_enable:
658658
SELECT debug_waitpoint_enable('after_process_cagg_invalidations_for_refresh_lock');
659659

660660
debug_waitpoint_enable
@@ -675,15 +675,11 @@ step R1_refresh:
675675
step R3_refresh:
676676
CALL refresh_continuous_aggregate('cond_10', 70, 107);
677677
<waiting ...>
678-
step K1_killpid:
679-
CALL killpids();
678+
step K1_cancelpid:
679+
CALL cancelpids();
680680
<waiting ...>
681681
step R1_refresh: <... completed>
682-
FATAL: terminating connection due to administrator command
683-
server closed the connection unexpectedly
684-
This probably means the server terminated abnormally
685-
before or while processing the request.
686-
682+
ERROR: canceling statement due to user request
687683
step R6_pending_materialization_ranges:
688684
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';
689685

@@ -693,8 +689,8 @@ cond_10 | 30| 70
693689
cond_10 | 70| 100
694690
(2 rows)
695691

696-
step K1_killpid: <... completed>
697-
step WP_release:
692+
step K1_cancelpid: <... completed>
693+
step WP_after_release:
698694
SELECT debug_waitpoint_release('after_process_cagg_invalidations_for_refresh_lock');
699695

700696
debug_waitpoint_release
@@ -713,3 +709,70 @@ user_view_name|lowest_modified_value|greatest_modified_value
713709
--------------+---------------------+-----------------------
714710
(0 rows)
715711

712+
713+
starting permutation: WP_before_enable R1_refresh R3_refresh WP_before_release
714+
R5: LOG: statement:
715+
SET SESSION lock_timeout = '500ms';
716+
SET SESSION deadlock_timeout = '500ms';
717+
SET SESSION client_min_messages = 'DEBUG1';
718+
719+
L1: WARNING: there is already a transaction in progress
720+
L2: WARNING: there is already a transaction in progress
721+
L3: WARNING: there is already a transaction in progress
722+
step WP_before_enable:
723+
SELECT debug_waitpoint_enable('before_process_cagg_invalidations_for_refresh_lock');
724+
725+
debug_waitpoint_enable
726+
----------------------
727+
728+
(1 row)
729+
730+
step R1_refresh:
731+
CALL refresh_continuous_aggregate('cond_10', 25, 70);
732+
<waiting ...>
733+
step R3_refresh:
734+
CALL refresh_continuous_aggregate('cond_10', 70, 107);
735+
<waiting ...>
736+
step WP_before_release:
737+
SELECT debug_waitpoint_release('before_process_cagg_invalidations_for_refresh_lock');
738+
739+
debug_waitpoint_release
740+
-----------------------
741+
742+
(1 row)
743+
744+
step R1_refresh: <... completed>
745+
step R3_refresh: <... completed>
746+
747+
starting permutation: WP_after_materialization_enable R1_refresh WP_after_materialization_release R3_refresh
748+
R5: LOG: statement:
749+
SET SESSION lock_timeout = '500ms';
750+
SET SESSION deadlock_timeout = '500ms';
751+
SET SESSION client_min_messages = 'DEBUG1';
752+
753+
L1: WARNING: there is already a transaction in progress
754+
L2: WARNING: there is already a transaction in progress
755+
L3: WARNING: there is already a transaction in progress
756+
step WP_after_materialization_enable:
757+
SELECT debug_waitpoint_enable('after_process_cagg_materializations');
758+
759+
debug_waitpoint_enable
760+
----------------------
761+
762+
(1 row)
763+
764+
step R1_refresh:
765+
CALL refresh_continuous_aggregate('cond_10', 25, 70);
766+
<waiting ...>
767+
step WP_after_materialization_release:
768+
SELECT debug_waitpoint_release('after_process_cagg_materializations');
769+
770+
debug_waitpoint_release
771+
-----------------------
772+
773+
(1 row)
774+
775+
step R1_refresh: <... completed>
776+
step R3_refresh:
777+
CALL refresh_continuous_aggregate('cond_10', 70, 107);
778+

tsl/test/isolation/specs/cagg_concurrent_refresh.spec

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -113,21 +113,21 @@ setup
113113
FROM _timescaledb_catalog.continuous_agg
114114
WHERE user_view_name = cagg
115115
INTO mattable;
116-
EXECUTE format('LOCK table %s IN ROW EXCLUSIVE MODE', mattable);
116+
EXECUTE format('LOCK table %s IN SHARE UPDATE EXCLUSIVE MODE', mattable);
117117
END; $$ LANGUAGE plpgsql;
118118

119-
CREATE TABLE killpid (
119+
CREATE TABLE cancelpid (
120120
pid INTEGER NOT NULL PRIMARY KEY
121121
);
122-
CREATE OR REPLACE PROCEDURE killpids() AS
122+
CREATE OR REPLACE PROCEDURE cancelpids() AS
123123
$$
124124
BEGIN
125-
PERFORM pg_terminate_backend(pid) FROM killpid;
126-
WHILE EXISTS (SELECT FROM pg_stat_activity WHERE pid IN (SELECT pid FROM killpid))
125+
PERFORM pg_cancel_backend(pid) FROM cancelpid;
126+
WHILE EXISTS (SELECT FROM pg_stat_activity WHERE pid IN (SELECT pid FROM cancelpid) AND state = 'active')
127127
LOOP
128128
PERFORM pg_sleep(0.01);
129129
END LOOP;
130-
DELETE FROM killpid;
130+
DELETE FROM cancelpid;
131131
END;
132132
$$ LANGUAGE plpgsql;
133133

@@ -176,27 +176,47 @@ setup
176176
teardown {
177177
DROP TABLE conditions CASCADE;
178178
DROP TABLE conditions2 CASCADE;
179-
DROP TABLE killpid;
179+
DROP TABLE cancelpid;
180180
}
181181

182182
# Waitpoint for cagg invalidation logs
183-
session "WP"
184-
step "WP_enable"
183+
session "WP_after"
184+
step "WP_after_enable"
185185
{
186186
SELECT debug_waitpoint_enable('after_process_cagg_invalidations_for_refresh_lock');
187187
}
188-
step "WP_release"
188+
step "WP_after_release"
189189
{
190190
SELECT debug_waitpoint_release('after_process_cagg_invalidations_for_refresh_lock');
191191
}
192192

193+
session "WP_before"
194+
step "WP_before_enable"
195+
{
196+
SELECT debug_waitpoint_enable('before_process_cagg_invalidations_for_refresh_lock');
197+
}
198+
step "WP_before_release"
199+
{
200+
SELECT debug_waitpoint_release('before_process_cagg_invalidations_for_refresh_lock');
201+
}
202+
203+
session "WP_after_materialization"
204+
step "WP_after_materialization_enable"
205+
{
206+
SELECT debug_waitpoint_enable('after_process_cagg_materializations');
207+
}
208+
step "WP_after_materialization_release"
209+
{
210+
SELECT debug_waitpoint_release('after_process_cagg_materializations');
211+
}
212+
193213
# Session to refresh the cond_10 continuous aggregate
194214
session "R1"
195215
setup
196216
{
197217
SET SESSION lock_timeout = '500ms';
198218
SET SESSION deadlock_timeout = '500ms';
199-
INSERT INTO killpid VALUES (pg_backend_pid())
219+
INSERT INTO cancelpid VALUES (pg_backend_pid())
200220
ON CONFLICT (pid) DO NOTHING;
201221
}
202222
step "R1_refresh"
@@ -364,9 +384,9 @@ step "S1_select"
364384
}
365385

366386
session "K1"
367-
step "K1_killpid"
387+
step "K1_cancelpid"
368388
{
369-
CALL killpids();
389+
CALL cancelpids();
370390
}
371391

372392
####################################################################
@@ -396,7 +416,7 @@ permutation "R3_refresh" "L2_read_lock_threshold_table" "R1_refresh" "L2_read_un
396416
##################################################################
397417
#
398418
# Tests for concurrent refreshes of continuous aggregates (second
399-
# transaction of a refresh).
419+
# and third transactions of a refresh).
400420
#
401421
##################################################################
402422

@@ -406,8 +426,8 @@ permutation "L3_lock_cagg_table" "R1_refresh" "L3_unlock_cagg_table" "S1_select"
406426
# R1 and R2 queued to refresh
407427
permutation "L3_lock_cagg_table" "R1_refresh" "R2_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table"
408428

409-
# R1 and R3 don't have overlapping refresh windows, but should skip
410-
# locks and process the materialization.
429+
# R1 and R3 don't have overlapping refresh windows, but should serialize
430+
# anyway cause we're locking the cagg hypertable
411431
permutation "L3_lock_cagg_table" "R1_refresh" "R3_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table"
412432

413433
# Concurrent refreshing across two different aggregates on same
@@ -420,8 +440,15 @@ permutation "R1_refresh" "R12_refresh"
420440

421441
# CAgg invalidation logs processing in a separated transaction and the materialization
422442
# transaction can be executed concurrently
423-
permutation "WP_enable" "R1_refresh"("WP_enable") "R6_pending_materialization_ranges" "R5_refresh"("WP_enable") "R6_pending_materialization_ranges" "WP_release" "R6_pending_materialization_ranges" "S1_select"
443+
permutation "WP_after_enable" "R1_refresh"("WP_after_enable") "R6_pending_materialization_ranges" "R5_refresh"("WP_after_enable") "R6_pending_materialization_ranges" "WP_after_release" "R6_pending_materialization_ranges" "S1_select"
424444

425445
# CAgg materialization phase (third trasaction of the refresh procedure) terminated by another session and then
426446
# refreshing again and make sure the pending ranges will be processed
427-
permutation "WP_enable" "R6_pending_materialization_ranges" "R1_refresh"("WP_enable") "R3_refresh"("WP_enable") "K1_killpid"("R1_refresh") "R6_pending_materialization_ranges" "WP_release" "R13_refresh"("K1_killpid") "R6_pending_materialization_ranges"
447+
permutation "WP_after_enable" "R6_pending_materialization_ranges" "R1_refresh"("WP_after_enable") "R3_refresh"("WP_after_enable") "K1_cancelpid"("R1_refresh") "R6_pending_materialization_ranges" "WP_after_release" "R13_refresh"("K1_cancelpid") "R6_pending_materialization_ranges"
448+
449+
# R3 should wait for R1 to finish because there are cagg invalidation rows locked
450+
permutation "WP_before_enable" "R1_refresh"("WP_before_enable") "R3_refresh" "WP_before_release"
451+
452+
# Concurrent refresh of caggs on non-overlapping ranges should not
453+
# block each other in the third transaction (materialization)
454+
permutation "WP_after_materialization_enable" "R1_refresh"("WP_after_materialization_enable") "WP_after_materialization_release" "R3_refresh"

0 commit comments

Comments
 (0)