Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .unreleased/pr_8607
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixes: #8607 Fix interrupted CAgg refresh materialization phase leaving behind pending materialization ranges
Thanks: @snyrkill for reporting a bug when interrupting a CAgg refresh
64 changes: 63 additions & 1 deletion tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ typedef enum MaterializationPlanType
PLAN_TYPE_MERGE_DELETE,
PLAN_TYPE_RANGES_SELECT,
PLAN_TYPE_RANGES_DELETE,
PLAN_TYPE_RANGES_PENDING,
_MAX_MATERIALIZATION_PLAN_TYPES
} MaterializationPlanType;

Expand Down Expand Up @@ -91,13 +92,15 @@ static char *create_materialization_merge_statement(MaterializationContext *cont
static char *create_materialization_merge_delete_statement(MaterializationContext *context);
static char *create_materialization_ranges_select_statement(MaterializationContext *context);
static char *create_materialization_ranges_delete_statement(MaterializationContext *context);
static char *create_materialization_ranges_pending_statement(MaterializationContext *context);

static void emit_materialization_insert_error(MaterializationContext *context);
static void emit_materialization_delete_error(MaterializationContext *context);
static void emit_materialization_exists_error(MaterializationContext *context);
static void emit_materialization_merge_error(MaterializationContext *context);
static void emit_materialization_ranges_select_error(MaterializationContext *context);
static void emit_materialization_ranges_delete_error(MaterializationContext *context);
static void emit_materialization_ranges_pending_error(MaterializationContext *context);

static void emit_materialization_insert_progress(MaterializationContext *context,
uint64 rows_processed);
Expand Down Expand Up @@ -137,6 +140,11 @@ static MaterializationPlan materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPES
.create_statement =
create_materialization_ranges_delete_statement,
.emit_error = emit_materialization_ranges_delete_error },
[PLAN_TYPE_RANGES_PENDING] = { .read_only = true,
.nargs = 3,
.create_statement =
create_materialization_ranges_pending_statement,
.emit_error = emit_materialization_ranges_pending_error },
};

static Oid *create_materialization_plan_argtypes(MaterializationContext *context,
Expand Down Expand Up @@ -201,6 +209,32 @@ continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *c
AtEOXact_GUC(false, save_nestlevel);
}

/* API to check for pending materialization ranges */
bool
continuous_agg_has_pending_materializations(const ContinuousAgg *cagg,
InternalTimeRange materialization_range)
{
MaterializationContext context = {
.cagg = cagg,
.internal_materialization_range = materialization_range,
};

/* Lock down search_path */
int save_nestlevel = NewGUCNestLevel();
RestrictSearchPath();

if (materialization_range.start > materialization_range.end)
materialization_range.start = materialization_range.end;

bool has_pending_materializations =
(execute_materialization_plan(&context, PLAN_TYPE_RANGES_PENDING) > 0);

/* Restore search_path */
AtEOXact_GUC(false, save_nestlevel);

return has_pending_materializations;
}

static Datum
time_range_internal_to_min_time_value(Oid type)
{
Expand Down Expand Up @@ -558,6 +592,23 @@ create_materialization_ranges_delete_statement(MaterializationContext *context)
return query.data;
}

static char *
create_materialization_ranges_pending_statement(MaterializationContext *context)
{
StringInfoData query;
initStringInfo(&query);

appendStringInfo(&query,
"SELECT * "
"FROM _timescaledb_catalog.continuous_aggs_materialization_ranges "
"WHERE materialization_id = $1 "
"AND pg_catalog.int8range(lowest_modified_value, greatest_modified_value) && "
"pg_catalog.int8range($2, $3) "
"LIMIT 1 ");

return query.data;
}

static void
emit_materialization_insert_error(MaterializationContext *context)
{
Expand Down Expand Up @@ -612,6 +663,15 @@ emit_materialization_ranges_delete_error(MaterializationContext *context)
NameStr(*context->materialization_table.name));
}

static void
emit_materialization_ranges_pending_error(MaterializationContext *context)
{
elog(ERROR,
"could not select pending materialization ranges \"%s.%s\"",
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));
}

static void
emit_materialization_insert_progress(MaterializationContext *context, uint64 rows_processed)
{
Expand Down Expand Up @@ -651,7 +711,8 @@ create_materialization_plan_argtypes(MaterializationContext *context,
switch (plan_type)
{
case PLAN_TYPE_RANGES_SELECT: /* 3 arguments */
argtypes[0] = INT4OID; /* materialization_id */
case PLAN_TYPE_RANGES_PENDING:
argtypes[0] = INT4OID; /* materialization_id */
argtypes[1] = INT8OID;
argtypes[2] = INT8OID;
break;
Expand Down Expand Up @@ -703,6 +764,7 @@ create_materialization_plan_args(MaterializationContext *context, Materializatio
switch (plan_type)
{
case PLAN_TYPE_RANGES_SELECT: /* 3 arguments */
case PLAN_TYPE_RANGES_PENDING:
{
(*values)[0] = Int32GetDatum(context->cagg->data.mat_hypertable_id);
(*values)[1] = Int64GetDatum(context->internal_materialization_range.start);
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/continuous_aggs/materialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ void continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousA
SchemaAndName materialization_table,
const NameData *time_column_name,
InternalTimeRange materialization_range, int32 chunk_id);
bool continuous_agg_has_pending_materializations(const ContinuousAgg *cagg,
InternalTimeRange materialization_range);
36 changes: 31 additions & 5 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -919,11 +919,37 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,

cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id, false);

if (!process_cagg_invalidations_and_refresh(cagg,
&refresh_window,
context,
INVALID_CHUNK_ID,
force))
bool refreshed = process_cagg_invalidations_and_refresh(cagg,
&refresh_window,
context,
INVALID_CHUNK_ID,
force);

bool has_pending_materializations =
continuous_agg_has_pending_materializations(cagg, refresh_window);

if (has_pending_materializations)
{
ContinuousAggRefreshState refresh;
continuous_agg_refresh_init(&refresh, cagg, &refresh_window);

InternalTimeRange invalidation = {
.type = refresh_window.type,
.start = refresh_window.start,
/* Invalidations are inclusive at the end, while refresh windows
* aren't, so add one to the end of the invalidated region */
.end = ts_time_saturating_add(refresh_window.end, 1, refresh_window.type),
};

InternalTimeRange bucketed_refresh_window =
compute_circumscribed_bucketed_refresh_window(cagg,
&invalidation,
cagg->bucket_function);

continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, INVALID_CHUNK_ID);
}

if (!refreshed && !has_pending_materializations)
emit_up_to_date_notice(cagg, context);

/* Restore search_path */
Expand Down
116 changes: 77 additions & 39 deletions tsl/test/isolation/expected/cagg_concurrent_refresh.out
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Parsed test spec with 12 sessions
Parsed test spec with 14 sessions

starting permutation: R1_refresh S1_select R3_refresh S1_select L2_read_unlock_threshold_table L3_unlock_cagg_table L1_unlock_threshold_table
step R1_refresh:
Expand Down Expand Up @@ -356,7 +356,6 @@ step L3_unlock_cagg_table:
ROLLBACK;

step R1_refresh: <... completed>
R2: NOTICE: continuous aggregate "cond_10" is already up-to-date
step R2_refresh: <... completed>
step S1_select:
SELECT bucket, avg_temp
Expand Down Expand Up @@ -545,7 +544,7 @@ step R12_refresh:
CALL refresh_continuous_aggregate('cond2_10', 25, 70);


starting permutation: WP_enable R1_refresh R6_materialization_ranges R5_refresh R6_materialization_ranges WP_release R6_materialization_ranges S1_select
starting permutation: WP_enable R1_refresh R6_pending_materialization_ranges R5_refresh R6_pending_materialization_ranges WP_release R6_pending_materialization_ranges S1_select
R5: LOG: statement:
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
Expand All @@ -565,18 +564,8 @@ debug_waitpoint_enable
step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 25, 70);
<waiting ...>
step R6_materialization_ranges:
SELECT
c.user_view_name,
m.lowest_modified_value,
m.greatest_modified_value
FROM
_timescaledb_catalog.continuous_aggs_materialization_ranges m
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
WHERE
c.user_view_name = 'cond_10'
ORDER BY
1, 2, 3;
step R6_pending_materialization_ranges:
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';

user_view_name|lowest_modified_value|greatest_modified_value
--------------+---------------------+-----------------------
Expand All @@ -589,18 +578,8 @@ R5: LOG: statement:
step R5_refresh:
CALL refresh_continuous_aggregate('cond_10', 70, 107);
<waiting ...>
step R6_materialization_ranges:
SELECT
c.user_view_name,
m.lowest_modified_value,
m.greatest_modified_value
FROM
_timescaledb_catalog.continuous_aggs_materialization_ranges m
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
WHERE
c.user_view_name = 'cond_10'
ORDER BY
1, 2, 3;
step R6_pending_materialization_ranges:
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';

user_view_name|lowest_modified_value|greatest_modified_value
--------------+---------------------+-----------------------
Expand All @@ -621,18 +600,8 @@ R5: DEBUG: continuous aggregate refresh (individual invalidation) on "cond_10"
R5: LOG: deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_X"
R5: LOG: inserted 3 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_X"
step R5_refresh: <... completed>
step R6_materialization_ranges:
SELECT
c.user_view_name,
m.lowest_modified_value,
m.greatest_modified_value
FROM
_timescaledb_catalog.continuous_aggs_materialization_ranges m
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
WHERE
c.user_view_name = 'cond_10'
ORDER BY
1, 2, 3;
step R6_pending_materialization_ranges:
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';

user_view_name|lowest_modified_value|greatest_modified_value
--------------+---------------------+-----------------------
Expand Down Expand Up @@ -675,3 +644,72 @@ conditions | 100
conditions2|-2147483648
(2 rows)


starting permutation: WP_enable R6_pending_materialization_ranges R1_refresh R3_refresh K1_killpid R6_pending_materialization_ranges WP_release R13_refresh R6_pending_materialization_ranges
R5: LOG: statement:
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
SET SESSION client_min_messages = 'DEBUG1';

L1: WARNING: there is already a transaction in progress
L2: WARNING: there is already a transaction in progress
L3: WARNING: there is already a transaction in progress
step WP_enable:
SELECT debug_waitpoint_enable('after_process_cagg_invalidations_for_refresh_lock');

debug_waitpoint_enable
----------------------

(1 row)

step R6_pending_materialization_ranges:
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';

user_view_name|lowest_modified_value|greatest_modified_value
--------------+---------------------+-----------------------
(0 rows)

step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 25, 70);
<waiting ...>
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 70, 107);
<waiting ...>
step K1_killpid:
CALL killpids();
<waiting ...>
step R1_refresh: <... completed>
FATAL: terminating connection due to administrator command
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

step R6_pending_materialization_ranges:
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';

user_view_name|lowest_modified_value|greatest_modified_value
--------------+---------------------+-----------------------
cond_10 | 30| 70
cond_10 | 70| 100
(2 rows)

step K1_killpid: <... completed>
step WP_release:
SELECT debug_waitpoint_release('after_process_cagg_invalidations_for_refresh_lock');

debug_waitpoint_release
-----------------------

(1 row)

step R3_refresh: <... completed>
step R13_refresh:
CALL refresh_continuous_aggregate('cond_10', 25, 70);

step R6_pending_materialization_ranges:
SELECT * FROM pending_materialization_ranges WHERE user_view_name = 'cond_10';

user_view_name|lowest_modified_value|greatest_modified_value
--------------+---------------------+-----------------------
(0 rows)

Loading
Loading