Skip to content

Commit 883c70e

Browse files
committed
Concurrent CAgg refresh improvements
In timescale#8117 we allowed concurrent CAgg refreshes by relaxing the strong lock when processing the invalidation by locking those rows and skipping the lock when the lock was already taken. The problem was when we had only one invalidation log to either expand or contract that the first session take the lock and the second skip the lock and return to the user that the CAgg is up-to-date and this is wrong. So in order to improve and fix this wrong behavior we've splitted the second transaction (data materialization) into two transactions: 1. process the cagg invalidation logs (expand/contract) and insert the ranges to be materialized into a new metadata table named `_timescaledb_catalog.continuous_aggs_materialization_ranges` to process the materialization in the next transaction 2. process the materialization by reading ranges using FOR UPDATE SKIP LOCK from the new metadata table, execute the materialization and at the end remove the row from the new metadata table. This new approach is like a producer/consumer pattern where the first transaction produce items in the queue to be consumed and processed by the next transaction.
1 parent 51cff23 commit 883c70e

File tree

15 files changed

+538
-192
lines changed

15 files changed

+538
-192
lines changed

.unreleased/pr_8514

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implements: #8514 Concurrent Continuous Aggregates improvements

sql/pre_install/tables.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,18 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs
442442

443443
CREATE INDEX continuous_aggs_materialization_invalidation_log_idx ON _timescaledb_catalog.continuous_aggs_materialization_invalidation_log (materialization_id, lowest_modified_value ASC);
444444

445+
-- cagg materialization ranges
446+
CREATE TABLE _timescaledb_catalog.continuous_aggs_materialization_ranges (
447+
materialization_id integer,
448+
lowest_modified_value bigint NOT NULL,
449+
greatest_modified_value bigint NOT NULL,
450+
-- table constraints
451+
CONSTRAINT continuous_aggs_materialization_ranges_materialization_id_fkey FOREIGN KEY (materialization_id) REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id) ON DELETE CASCADE
452+
);
453+
454+
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_materialization_ranges', '');
455+
456+
CREATE INDEX continuous_aggs_materialization_ranges_idx ON _timescaledb_catalog.continuous_aggs_materialization_ranges (materialization_id, lowest_modified_value ASC);
445457

446458
/* the source of this data is the enum from the source code that lists
447459
* the algorithms. This table is NOT dumped.

sql/updates/latest-dev.sql

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,18 @@ DROP FUNCTION IF EXISTS _timescaledb_functions.indexes_local_size;
244244
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.chunk_index;
245245
DROP TABLE IF EXISTS _timescaledb_catalog.chunk_index;
246246

247+
-- cagg materialization ranges
248+
CREATE TABLE _timescaledb_catalog.continuous_aggs_materialization_ranges (
249+
materialization_id integer,
250+
lowest_modified_value bigint NOT NULL,
251+
greatest_modified_value bigint NOT NULL,
252+
-- table constraints
253+
CONSTRAINT continuous_aggs_materialization_ranges_materialization_id_fkey FOREIGN KEY (materialization_id) REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id) ON DELETE CASCADE
254+
);
255+
256+
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_materialization_ranges', '');
257+
258+
CREATE INDEX continuous_aggs_materialization_ranges_idx ON _timescaledb_catalog.continuous_aggs_materialization_ranges (materialization_id, lowest_modified_value ASC);
259+
260+
GRANT SELECT ON TABLE _timescaledb_catalog.continuous_aggs_materialization_ranges TO PUBLIC;
261+

sql/updates/reverse-dev.sql

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,3 +229,26 @@ GRANT SELECT ON TABLE _timescaledb_catalog.chunk_index TO PUBLIC;
229229
DROP FUNCTION IF EXISTS _timescaledb_functions.chunk_status_text(regclass);
230230
DROP FUNCTION IF EXISTS _timescaledb_functions.chunk_status_text(int);
231231

232+
DO
233+
$$
234+
DECLARE
235+
caggs_to_refresh TEXT;
236+
BEGIN
237+
IF EXISTS (SELECT FROM _timescaledb_catalog.continuous_aggs_materialization_ranges LIMIT 1) THEN
238+
SELECT string_agg(format('%I.%I', user_view_schema, user_view_name), ', ' ORDER BY user_view_schema, user_view_name)
239+
INTO caggs_to_refresh
240+
FROM _timescaledb_catalog.continuous_aggs_materialization_ranges
241+
JOIN _timescaledb_catalog.continuous_agg ON materialization_id = mat_hypertable_id;
242+
243+
RAISE EXCEPTION 'cannot downgrade because there are pending CAgg refreshes'
244+
USING
245+
ERRCODE = 'object_not_in_prerequisite_state',
246+
DETAIL = format('Please refresh the CAggs before downgrade: %s.', caggs_to_refresh);
247+
END IF;
248+
END;
249+
$$;
250+
251+
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.continuous_aggs_materialization_ranges;
252+
253+
DROP TABLE IF EXISTS _timescaledb_catalog.continuous_aggs_materialization_ranges;
254+

src/compat/compat.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,3 +845,19 @@ initReadOnlyStringInfo(StringInfo str, char *data, int len)
845845
tmfd, \
846846
is_merge_delete)
847847
#endif
848+
849+
/* PG16 consolidates ItemPointer to datum functions so backported it to PG15
850+
* https://github.com/postgres/postgres/commit/bd944884e92a */
851+
#if PG16_LT
852+
static inline ItemPointer
853+
DatumGetItemPointer(Datum X)
854+
{
855+
return (ItemPointer) DatumGetPointer(X);
856+
}
857+
858+
static inline Datum
859+
ItemPointerGetDatum(const ItemPointerData *X)
860+
{
861+
return PointerGetDatum(X);
862+
}
863+
#endif

src/ts_catalog/catalog.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
8585
.schema_name = CATALOG_SCHEMA_NAME,
8686
.table_name = CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG_TABLE_NAME,
8787
},
88+
[CONTINUOUS_AGGS_MATERIALIZATION_RANGES] = {
89+
.schema_name = CATALOG_SCHEMA_NAME,
90+
.table_name = CONTINUOUS_AGGS_MATERIALIZATION_RANGES_TABLE_NAME,
91+
},
8892
[COMPRESSION_SETTINGS] = {
8993
.schema_name = CATALOG_SCHEMA_NAME,
9094
.table_name = COMPRESSION_SETTINGS_TABLE_NAME,
@@ -227,6 +231,12 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
227231
[CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG_IDX] = "continuous_aggs_materialization_invalidation_log_idx",
228232
},
229233
},
234+
[CONTINUOUS_AGGS_MATERIALIZATION_RANGES] = {
235+
.length = _MAX_CONTINUOUS_AGGS_MATERIALIZATION_RANGES_INDEX,
236+
.names = (char *[]) {
237+
[CONTINUOUS_AGGS_MATERIALIZATION_RANGES_IDX] = "continuous_aggs_materialization_ranges_idx",
238+
},
239+
},
230240
[CONTINUOUS_AGGS_WATERMARK] = {
231241
.length = _MAX_CONTINUOUS_AGGS_WATERMARK_INDEX,
232242
.names = (char *[]) {

src/ts_catalog/catalog.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ typedef enum CatalogTable
4747
CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG,
4848
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD,
4949
CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG,
50+
CONTINUOUS_AGGS_MATERIALIZATION_RANGES,
5051
COMPRESSION_SETTINGS,
5152
COMPRESSION_CHUNK_SIZE,
5253
CONTINUOUS_AGGS_BUCKET_FUNCTION,
@@ -1072,6 +1073,44 @@ typedef enum Anum_continuous_aggs_materialization_invalidation_log_idx
10721073
#define Natts_continuous_aggs_materialization_invalidation_log_idx \
10731074
(_Anum_continuous_aggs_materialization_invalidation_log_idx_max - 1)
10741075

1076+
/****** CONTINUOUS_AGGS_MATERIALIZATION_RANGES_TABLE definitions*/
1077+
#define CONTINUOUS_AGGS_MATERIALIZATION_RANGES_TABLE_NAME "continuous_aggs_materialization_ranges"
1078+
typedef enum Anum_continuous_aggs_materialization_ranges
1079+
{
1080+
Anum_continuous_aggs_materialization_ranges_materialization_id = 1,
1081+
Anum_continuous_aggs_materialization_ranges_lowest_modified_value,
1082+
Anum_continuous_aggs_materialization_ranges_greatest_modified_value,
1083+
_Anum_continuous_aggs_materialization_ranges_max,
1084+
} Anum_continuous_aggs_materialization_ranges;
1085+
1086+
#define Natts_continuous_aggs_materialization_ranges \
1087+
(_Anum_continuous_aggs_materialization_ranges_max - 1)
1088+
1089+
typedef struct FormData_continuous_aggs_materialization_ranges
1090+
{
1091+
int32 materialization_id;
1092+
int64 lowest_modified_value;
1093+
int64 greatest_modified_value;
1094+
} FormData_continuous_aggs_materialization_ranges;
1095+
1096+
typedef FormData_continuous_aggs_materialization_ranges
1097+
*Form_continuous_aggs_materialization_ranges;
1098+
1099+
enum
1100+
{
1101+
CONTINUOUS_AGGS_MATERIALIZATION_RANGES_IDX = 0,
1102+
_MAX_CONTINUOUS_AGGS_MATERIALIZATION_RANGES_INDEX,
1103+
};
1104+
typedef enum Anum_continuous_aggs_materialization_ranges_idx
1105+
{
1106+
Anum_continuous_aggs_materialization_ranges_idx_materialization_id = 1,
1107+
Anum_continuous_aggs_materialization_ranges_idx_lowest_modified_value,
1108+
_Anum_continuous_aggs_materialization_ranges_idx_max,
1109+
} Anum_continuous_aggs_materialization_ranges_idx;
1110+
1111+
#define Natts_continuous_aggs_materialization_ranges_idx \
1112+
(_Anum_continuous_aggs_materialization_ranges_idx_max - 1)
1113+
10751114
/****** CONTINUOUS_AGGS_WATERMARK_TABLE definitions*/
10761115
#define CONTINUOUS_AGGS_WATERMARK_TABLE_NAME "continuous_aggs_watermark"
10771116
typedef enum Anum_continuous_aggs_watermark

src/ts_catalog/continuous_aggs_watermark.c

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,16 @@ cagg_watermark_update_scan_internal(TupleInfo *ti, void *data)
243243
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
244244
Form_continuous_aggs_watermark form = (Form_continuous_aggs_watermark) GETSTRUCT(tuple);
245245

246+
/* If the tuple was modified concurrently, retry the operation and use a new snapshot
247+
* to see the updated tuple. */
248+
if (ti->lockresult == TM_Updated)
249+
return SCAN_RESTART_WITH_NEW_SNAPSHOT;
250+
251+
Ensure(ti->lockresult == TM_Ok,
252+
"unable to lock watermark tuple for cagg %d (lock result %d)",
253+
watermark_update->ht_relid,
254+
ti->lockresult);
255+
246256
if (watermark_update->watermark > form->watermark || watermark_update->force_update)
247257
{
248258
HeapTuple new_tuple = heap_copytuple(tuple);
@@ -284,27 +294,27 @@ static void
284294
cagg_watermark_update_internal(int32 mat_hypertable_id, Oid ht_relid, int64 new_watermark,
285295
bool force_update, bool invalidate_rel_cache)
286296
{
287-
bool watermark_updated;
288-
ScanKeyData scankey[1];
289297
WatermarkUpdate data = { .watermark = new_watermark,
290298
.force_update = force_update,
291299
.invalidate_rel_cache = invalidate_rel_cache,
292300
.ht_relid = ht_relid };
301+
ScanIterator iterator =
302+
ts_scan_iterator_create(CONTINUOUS_AGGS_WATERMARK, RowExclusiveLock, CurrentMemoryContext);
293303

294-
ScanKeyInit(&scankey[0],
295-
Anum_continuous_aggs_watermark_mat_hypertable_id,
296-
BTEqualStrategyNumber,
297-
F_INT4EQ,
298-
Int32GetDatum(mat_hypertable_id));
299-
300-
watermark_updated = ts_catalog_scan_one(CONTINUOUS_AGGS_WATERMARK /*=table*/,
301-
CONTINUOUS_AGGS_WATERMARK_PKEY /*=indexid*/,
302-
scankey /*=scankey*/,
303-
1 /*=num_keys*/,
304-
cagg_watermark_update_scan_internal /*=tuple_found*/,
305-
RowExclusiveLock /*=lockmode*/,
306-
CONTINUOUS_AGGS_WATERMARK_TABLE_NAME /*=table_name*/,
307-
&data /*=data*/);
304+
cagg_watermark_init_scan_by_mat_hypertable_id(&iterator, mat_hypertable_id);
305+
iterator.ctx.tuple_found = cagg_watermark_update_scan_internal;
306+
iterator.ctx.data = &data;
307+
iterator.ctx.snapshot = GetLatestSnapshot();
308+
ScanTupLock scantuplock = {
309+
.waitpolicy = LockWaitBlock,
310+
.lockmode = LockTupleExclusive,
311+
.lockflags = TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
312+
};
313+
iterator.ctx.tuplock = &scantuplock;
314+
iterator.ctx.flags = SCANNER_F_KEEPLOCK;
315+
316+
bool watermark_updated =
317+
ts_scanner_scan_one(&iterator.ctx, false, "continuous aggregate watermark");
308318

309319
if (!watermark_updated)
310320
{

test/expected/drop_rename_hypertable.out

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ SELECT schema, name FROM test.relation WHERE schema IN ('public', '_timescaledb_
200200
_timescaledb_catalog | continuous_aggs_hypertable_invalidation_log
201201
_timescaledb_catalog | continuous_aggs_invalidation_threshold
202202
_timescaledb_catalog | continuous_aggs_materialization_invalidation_log
203+
_timescaledb_catalog | continuous_aggs_materialization_ranges
203204
_timescaledb_catalog | continuous_aggs_watermark
204205
_timescaledb_catalog | dimension
205206
_timescaledb_catalog | dimension_slice
@@ -212,7 +213,7 @@ SELECT schema, name FROM test.relation WHERE schema IN ('public', '_timescaledb_
212213
_timescaledb_internal | bgw_policy_chunk_stats
213214
_timescaledb_internal | compressed_chunk_stats
214215
_timescaledb_internal | hypertable_chunk_local_size
215-
(25 rows)
216+
(26 rows)
216217

217218
-- Test that renaming ordinary table works
218219
CREATE TABLE renametable (foo int);

0 commit comments

Comments
 (0)