Skip to content

Commit a0a109d

Browse files
Fix interrupted CAgg refresh materialization phase
In #8514 we improved concurrent CAgg refreshes by splitting the second transaction (invalidation processing and data materialization) into two separated transactions. But now when interrupting the third transaction (data materialization) we'll left behind pending materialization ranges in the new metada table `continuous_aggs_materialization_ranges`. Fixed it by properly checking the existance of pending materialization ranges and if it exists execute the materialization.
1 parent 0f7b4ca commit a0a109d

File tree

6 files changed

+227
-61
lines changed

6 files changed

+227
-61
lines changed

.unreleased/pr_8607

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fixes: #8607 Fix interrupted CAgg refresh materialization phase leaving behind pending materialization ranges
2+
Thanks: @snyrkill for reporting a bug when interrupting a CAgg refresh

tsl/src/continuous_aggs/materialize.c

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ typedef enum MaterializationPlanType
5252
PLAN_TYPE_MERGE_DELETE,
5353
PLAN_TYPE_RANGES_SELECT,
5454
PLAN_TYPE_RANGES_DELETE,
55+
PLAN_TYPE_RANGES_PENDING,
5556
_MAX_MATERIALIZATION_PLAN_TYPES
5657
} MaterializationPlanType;
5758

@@ -91,13 +92,15 @@ static char *create_materialization_merge_statement(MaterializationContext *cont
9192
static char *create_materialization_merge_delete_statement(MaterializationContext *context);
9293
static char *create_materialization_ranges_select_statement(MaterializationContext *context);
9394
static char *create_materialization_ranges_delete_statement(MaterializationContext *context);
95+
static char *create_materialization_ranges_pending_statement(MaterializationContext *context);
9496

9597
static void emit_materialization_insert_error(MaterializationContext *context);
9698
static void emit_materialization_delete_error(MaterializationContext *context);
9799
static void emit_materialization_exists_error(MaterializationContext *context);
98100
static void emit_materialization_merge_error(MaterializationContext *context);
99101
static void emit_materialization_ranges_select_error(MaterializationContext *context);
100102
static void emit_materialization_ranges_delete_error(MaterializationContext *context);
103+
static void emit_materialization_ranges_pending_error(MaterializationContext *context);
101104

102105
static void emit_materialization_insert_progress(MaterializationContext *context,
103106
uint64 rows_processed);
@@ -137,6 +140,11 @@ static MaterializationPlan materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPES
137140
.create_statement =
138141
create_materialization_ranges_delete_statement,
139142
.emit_error = emit_materialization_ranges_delete_error },
143+
[PLAN_TYPE_RANGES_PENDING] = { .read_only = true,
144+
.nargs = 3,
145+
.create_statement =
146+
create_materialization_ranges_pending_statement,
147+
.emit_error = emit_materialization_ranges_pending_error },
140148
};
141149

142150
static Oid *create_materialization_plan_argtypes(MaterializationContext *context,
@@ -201,6 +209,32 @@ continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *c
201209
AtEOXact_GUC(false, save_nestlevel);
202210
}
203211

212+
/* API to check for pending materialization ranges */
213+
bool
214+
continuous_agg_has_pending_materializations(const ContinuousAgg *cagg,
215+
InternalTimeRange materialization_range)
216+
{
217+
MaterializationContext context = {
218+
.cagg = cagg,
219+
.internal_materialization_range = materialization_range,
220+
};
221+
222+
/* Lock down search_path */
223+
int save_nestlevel = NewGUCNestLevel();
224+
RestrictSearchPath();
225+
226+
if (materialization_range.start > materialization_range.end)
227+
materialization_range.start = materialization_range.end;
228+
229+
bool has_pending_materializations =
230+
(execute_materialization_plan(&context, PLAN_TYPE_RANGES_PENDING) > 0);
231+
232+
/* Restore search_path */
233+
AtEOXact_GUC(false, save_nestlevel);
234+
235+
return has_pending_materializations;
236+
}
237+
204238
static Datum
205239
time_range_internal_to_min_time_value(Oid type)
206240
{
@@ -558,6 +592,23 @@ create_materialization_ranges_delete_statement(MaterializationContext *context)
558592
return query.data;
559593
}
560594

595+
static char *
596+
create_materialization_ranges_pending_statement(MaterializationContext *context)
597+
{
598+
StringInfoData query;
599+
initStringInfo(&query);
600+
601+
appendStringInfo(&query,
602+
"SELECT * "
603+
"FROM _timescaledb_catalog.continuous_aggs_materialization_ranges "
604+
"WHERE materialization_id = $1 "
605+
"AND pg_catalog.int8range(lowest_modified_value, greatest_modified_value) && "
606+
"pg_catalog.int8range($2, $3) "
607+
"LIMIT 1 ");
608+
609+
return query.data;
610+
}
611+
561612
static void
562613
emit_materialization_insert_error(MaterializationContext *context)
563614
{
@@ -612,6 +663,15 @@ emit_materialization_ranges_delete_error(MaterializationContext *context)
612663
NameStr(*context->materialization_table.name));
613664
}
614665

666+
static void
667+
emit_materialization_ranges_pending_error(MaterializationContext *context)
668+
{
669+
elog(ERROR,
670+
"could not select pending materialization ranges \"%s.%s\"",
671+
NameStr(*context->materialization_table.schema),
672+
NameStr(*context->materialization_table.name));
673+
}
674+
615675
static void
616676
emit_materialization_insert_progress(MaterializationContext *context, uint64 rows_processed)
617677
{
@@ -651,7 +711,8 @@ create_materialization_plan_argtypes(MaterializationContext *context,
651711
switch (plan_type)
652712
{
653713
case PLAN_TYPE_RANGES_SELECT: /* 3 arguments */
654-
argtypes[0] = INT4OID; /* materialization_id */
714+
case PLAN_TYPE_RANGES_PENDING:
715+
argtypes[0] = INT4OID; /* materialization_id */
655716
argtypes[1] = INT8OID;
656717
argtypes[2] = INT8OID;
657718
break;
@@ -703,6 +764,7 @@ create_materialization_plan_args(MaterializationContext *context, Materializatio
703764
switch (plan_type)
704765
{
705766
case PLAN_TYPE_RANGES_SELECT: /* 3 arguments */
767+
case PLAN_TYPE_RANGES_PENDING:
706768
{
707769
(*values)[0] = Int32GetDatum(context->cagg->data.mat_hypertable_id);
708770
(*values)[1] = Int64GetDatum(context->internal_materialization_range.start);

tsl/src/continuous_aggs/materialize.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,5 @@ void continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousA
4242
SchemaAndName materialization_table,
4343
const NameData *time_column_name,
4444
InternalTimeRange materialization_range, int32 chunk_id);
45+
bool continuous_agg_has_pending_materializations(const ContinuousAgg *cagg,
46+
InternalTimeRange materialization_range);

tsl/src/continuous_aggs/refresh.c

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -919,11 +919,37 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
919919

920920
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id, false);
921921

922-
if (!process_cagg_invalidations_and_refresh(cagg,
923-
&refresh_window,
924-
context,
925-
INVALID_CHUNK_ID,
926-
force))
922+
bool refreshed = process_cagg_invalidations_and_refresh(cagg,
923+
&refresh_window,
924+
context,
925+
INVALID_CHUNK_ID,
926+
force);
927+
928+
bool has_pending_materializations =
929+
continuous_agg_has_pending_materializations(cagg, refresh_window);
930+
931+
if (has_pending_materializations)
932+
{
933+
ContinuousAggRefreshState refresh;
934+
continuous_agg_refresh_init(&refresh, cagg, &refresh_window);
935+
936+
InternalTimeRange invalidation = {
937+
.type = refresh_window.type,
938+
.start = refresh_window.start,
939+
/* Invalidations are inclusive at the end, while refresh windows
940+
* aren't, so add one to the end of the invalidated region */
941+
.end = ts_time_saturating_add(refresh_window.end, 1, refresh_window.type),
942+
};
943+
944+
InternalTimeRange bucketed_refresh_window =
945+
compute_circumscribed_bucketed_refresh_window(cagg,
946+
&invalidation,
947+
cagg->bucket_function);
948+
949+
continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, INVALID_CHUNK_ID);
950+
}
951+
952+
if (!refreshed && !has_pending_materializations)
927953
emit_up_to_date_notice(cagg, context);
928954

929955
/* Restore search_path */

tsl/test/isolation/expected/cagg_concurrent_refresh.out

Lines changed: 77 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Parsed test spec with 12 sessions
1+
Parsed test spec with 14 sessions
22

33
starting permutation: R1_refresh S1_select R3_refresh S1_select L2_read_unlock_threshold_table L3_unlock_cagg_table L1_unlock_threshold_table
44
step R1_refresh:
@@ -356,7 +356,6 @@ step L3_unlock_cagg_table:
356356
ROLLBACK;
357357

358358
step R1_refresh: <... completed>
359-
R2: NOTICE: continuous aggregate "cond_10" is already up-to-date
360359
step R2_refresh: <... completed>
361360
step S1_select:
362361
SELECT bucket, avg_temp
@@ -545,7 +544,7 @@ step R12_refresh:
545544
CALL refresh_continuous_aggregate('cond2_10', 25, 70);
546545

547546

548-
starting permutation: WP_enable R1_refresh R6_materialization_ranges R5_refresh R6_materialization_ranges WP_release R6_materialization_ranges S1_select
547+
starting permutation: WP_enable R1_refresh R6_pending_materialization_ranges R5_refresh R6_pending_materialization_ranges WP_release R6_pending_materialization_ranges S1_select
549548
R5: LOG: statement:
550549
SET SESSION lock_timeout = '500ms';
551550
SET SESSION deadlock_timeout = '500ms';
@@ -565,18 +564,8 @@ debug_waitpoint_enable
565564
step R1_refresh:
566565
CALL refresh_continuous_aggregate('cond_10', 25, 70);
567566
<waiting ...>
568-
step R6_materialization_ranges:
569-
SELECT
570-
c.user_view_name,
571-
m.lowest_modified_value,
572-
m.greatest_modified_value
573-
FROM
574-
_timescaledb_catalog.continuous_aggs_materialization_ranges m
575-
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
576-
WHERE
577-
c.user_view_name = 'cond_10'
578-
ORDER BY
579-
1, 2, 3;
567+
step R6_pending_materialization_ranges:
568+
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';
580569

581570
user_view_name|lowest_modified_value|greatest_modified_value
582571
--------------+---------------------+-----------------------
@@ -589,18 +578,8 @@ R5: LOG: statement:
589578
step R5_refresh:
590579
CALL refresh_continuous_aggregate('cond_10', 70, 107);
591580
<waiting ...>
592-
step R6_materialization_ranges:
593-
SELECT
594-
c.user_view_name,
595-
m.lowest_modified_value,
596-
m.greatest_modified_value
597-
FROM
598-
_timescaledb_catalog.continuous_aggs_materialization_ranges m
599-
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
600-
WHERE
601-
c.user_view_name = 'cond_10'
602-
ORDER BY
603-
1, 2, 3;
581+
step R6_pending_materialization_ranges:
582+
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';
604583

605584
user_view_name|lowest_modified_value|greatest_modified_value
606585
--------------+---------------------+-----------------------
@@ -621,18 +600,8 @@ R5: DEBUG: continuous aggregate refresh (individual invalidation) on "cond_10"
621600
R5: LOG: deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_X"
622601
R5: LOG: inserted 3 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_X"
623602
step R5_refresh: <... completed>
624-
step R6_materialization_ranges:
625-
SELECT
626-
c.user_view_name,
627-
m.lowest_modified_value,
628-
m.greatest_modified_value
629-
FROM
630-
_timescaledb_catalog.continuous_aggs_materialization_ranges m
631-
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
632-
WHERE
633-
c.user_view_name = 'cond_10'
634-
ORDER BY
635-
1, 2, 3;
603+
step R6_pending_materialization_ranges:
604+
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';
636605

637606
user_view_name|lowest_modified_value|greatest_modified_value
638607
--------------+---------------------+-----------------------
@@ -675,3 +644,72 @@ conditions | 100
675644
conditions2|-2147483648
676645
(2 rows)
677646

647+
648+
starting permutation: WP_enable R6_pending_materialization_ranges R1_refresh R3_refresh K1_killpid R6_pending_materialization_ranges WP_release R13_refresh R6_pending_materialization_ranges
649+
R5: LOG: statement:
650+
SET SESSION lock_timeout = '500ms';
651+
SET SESSION deadlock_timeout = '500ms';
652+
SET SESSION client_min_messages = 'DEBUG1';
653+
654+
L1: WARNING: there is already a transaction in progress
655+
L2: WARNING: there is already a transaction in progress
656+
L3: WARNING: there is already a transaction in progress
657+
step WP_enable:
658+
SELECT debug_waitpoint_enable('after_process_cagg_invalidations_for_refresh_lock');
659+
660+
debug_waitpoint_enable
661+
----------------------
662+
663+
(1 row)
664+
665+
step R6_pending_materialization_ranges:
666+
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';
667+
668+
user_view_name|lowest_modified_value|greatest_modified_value
669+
--------------+---------------------+-----------------------
670+
(0 rows)
671+
672+
step R1_refresh:
673+
CALL refresh_continuous_aggregate('cond_10', 25, 70);
674+
<waiting ...>
675+
step R3_refresh:
676+
CALL refresh_continuous_aggregate('cond_10', 70, 107);
677+
<waiting ...>
678+
step K1_killpid:
679+
CALL killpids();
680+
<waiting ...>
681+
step R1_refresh: <... completed>
682+
FATAL: terminating connection due to administrator command
683+
server closed the connection unexpectedly
684+
This probably means the server terminated abnormally
685+
before or while processing the request.
686+
687+
step R6_pending_materialization_ranges:
688+
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';
689+
690+
user_view_name|lowest_modified_value|greatest_modified_value
691+
--------------+---------------------+-----------------------
692+
cond_10 | 30| 70
693+
cond_10 | 70| 100
694+
(2 rows)
695+
696+
step K1_killpid: <... completed>
697+
step WP_release:
698+
SELECT debug_waitpoint_release('after_process_cagg_invalidations_for_refresh_lock');
699+
700+
debug_waitpoint_release
701+
-----------------------
702+
703+
(1 row)
704+
705+
step R3_refresh: <... completed>
706+
step R13_refresh:
707+
CALL refresh_continuous_aggregate('cond_10', 25, 70);
708+
709+
step R6_pending_materialization_ranges:
710+
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';
711+
712+
user_view_name|lowest_modified_value|greatest_modified_value
713+
--------------+---------------------+-----------------------
714+
(0 rows)
715+

0 commit comments

Comments
 (0)