Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .unreleased/pr_8607
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixes: #8607 Fix interrupted CAgg refresh materialization phase leaving behind pending materialization ranges
Thanks: @snyrkill for reporting a bug when interrupting a CAgg refresh
64 changes: 63 additions & 1 deletion tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ typedef enum MaterializationPlanType
PLAN_TYPE_MERGE_DELETE,
PLAN_TYPE_RANGES_SELECT,
PLAN_TYPE_RANGES_DELETE,
PLAN_TYPE_RANGES_PENDING,
_MAX_MATERIALIZATION_PLAN_TYPES
} MaterializationPlanType;

Expand Down Expand Up @@ -91,13 +92,15 @@ static char *create_materialization_merge_statement(MaterializationContext *cont
static char *create_materialization_merge_delete_statement(MaterializationContext *context);
static char *create_materialization_ranges_select_statement(MaterializationContext *context);
static char *create_materialization_ranges_delete_statement(MaterializationContext *context);
static char *create_materialization_ranges_pending_statement(MaterializationContext *context);

static void emit_materialization_insert_error(MaterializationContext *context);
static void emit_materialization_delete_error(MaterializationContext *context);
static void emit_materialization_exists_error(MaterializationContext *context);
static void emit_materialization_merge_error(MaterializationContext *context);
static void emit_materialization_ranges_select_error(MaterializationContext *context);
static void emit_materialization_ranges_delete_error(MaterializationContext *context);
static void emit_materialization_ranges_pending_error(MaterializationContext *context);

static void emit_materialization_insert_progress(MaterializationContext *context,
uint64 rows_processed);
Expand Down Expand Up @@ -137,6 +140,11 @@ static MaterializationPlan materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPES
.create_statement =
create_materialization_ranges_delete_statement,
.emit_error = emit_materialization_ranges_delete_error },
[PLAN_TYPE_RANGES_PENDING] = { .read_only = true,
.nargs = 3,
.create_statement =
create_materialization_ranges_pending_statement,
.emit_error = emit_materialization_ranges_pending_error },
};

static Oid *create_materialization_plan_argtypes(MaterializationContext *context,
Expand Down Expand Up @@ -201,6 +209,32 @@ continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *c
AtEOXact_GUC(false, save_nestlevel);
}

/* API to check for pending materialization ranges */
bool
continuous_agg_has_pending_materializations(const ContinuousAgg *cagg,
InternalTimeRange materialization_range)
{
MaterializationContext context = {
.cagg = cagg,
.internal_materialization_range = materialization_range,
};

/* Lock down search_path */
int save_nestlevel = NewGUCNestLevel();
RestrictSearchPath();

if (materialization_range.start > materialization_range.end)
materialization_range.start = materialization_range.end;

bool has_pending_materializations =
(execute_materialization_plan(&context, PLAN_TYPE_RANGES_PENDING) > 0);

/* Restore search_path */
AtEOXact_GUC(false, save_nestlevel);

return has_pending_materializations;
}

static Datum
time_range_internal_to_min_time_value(Oid type)
{
Expand Down Expand Up @@ -558,6 +592,23 @@ create_materialization_ranges_delete_statement(MaterializationContext *context)
return query.data;
}

static char *
create_materialization_ranges_pending_statement(MaterializationContext *context)
{
StringInfoData query;
initStringInfo(&query);

appendStringInfo(&query,
"SELECT * "
"FROM _timescaledb_catalog.continuous_aggs_materialization_ranges "
"WHERE materialization_id = $1 "
"AND pg_catalog.int8range(lowest_modified_value, greatest_modified_value) && "
"pg_catalog.int8range($2, $3) "
"LIMIT 1 ");

return query.data;
}

static void
emit_materialization_insert_error(MaterializationContext *context)
{
Expand Down Expand Up @@ -612,6 +663,15 @@ emit_materialization_ranges_delete_error(MaterializationContext *context)
NameStr(*context->materialization_table.name));
}

static void
emit_materialization_ranges_pending_error(MaterializationContext *context)
{
elog(ERROR,
"could not select pending materialization ranges \"%s.%s\"",
NameStr(*context->materialization_table.schema),
NameStr(*context->materialization_table.name));
}

static void
emit_materialization_insert_progress(MaterializationContext *context, uint64 rows_processed)
{
Expand Down Expand Up @@ -651,7 +711,8 @@ create_materialization_plan_argtypes(MaterializationContext *context,
switch (plan_type)
{
case PLAN_TYPE_RANGES_SELECT: /* 3 arguments */
argtypes[0] = INT4OID; /* materialization_id */
case PLAN_TYPE_RANGES_PENDING:
argtypes[0] = INT4OID; /* materialization_id */
argtypes[1] = INT8OID;
argtypes[2] = INT8OID;
break;
Expand Down Expand Up @@ -703,6 +764,7 @@ create_materialization_plan_args(MaterializationContext *context, Materializatio
switch (plan_type)
{
case PLAN_TYPE_RANGES_SELECT: /* 3 arguments */
case PLAN_TYPE_RANGES_PENDING:
{
(*values)[0] = Int32GetDatum(context->cagg->data.mat_hypertable_id);
(*values)[1] = Int64GetDatum(context->internal_materialization_range.start);
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/continuous_aggs/materialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ void continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousA
SchemaAndName materialization_table,
const NameData *time_column_name,
InternalTimeRange materialization_range, int32 chunk_id);
bool continuous_agg_has_pending_materializations(const ContinuousAgg *cagg,
InternalTimeRange materialization_range);
36 changes: 31 additions & 5 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -919,11 +919,37 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,

cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id, false);

if (!process_cagg_invalidations_and_refresh(cagg,
&refresh_window,
context,
INVALID_CHUNK_ID,
force))
bool refreshed = process_cagg_invalidations_and_refresh(cagg,
&refresh_window,
context,
INVALID_CHUNK_ID,
force);

bool has_pending_materializations =
continuous_agg_has_pending_materializations(cagg, refresh_window);

if (has_pending_materializations)
{
ContinuousAggRefreshState refresh;
continuous_agg_refresh_init(&refresh, cagg, &refresh_window);

InternalTimeRange invalidation = {
.type = refresh_window.type,
.start = refresh_window.start,
/* Invalidations are inclusive at the end, while refresh windows
* aren't, so add one to the end of the invalidated region */
.end = ts_time_saturating_add(refresh_window.end, 1, refresh_window.type),
};

InternalTimeRange bucketed_refresh_window =
compute_circumscribed_bucketed_refresh_window(cagg,
&invalidation,
cagg->bucket_function);

continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, INVALID_CHUNK_ID);
}

if (!refreshed && !has_pending_materializations)
emit_up_to_date_notice(cagg, context);

/* Restore search_path */
Expand Down
97 changes: 95 additions & 2 deletions tsl/test/isolation/expected/cagg_concurrent_refresh.out
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Parsed test spec with 12 sessions
Parsed test spec with 14 sessions

starting permutation: R1_refresh S1_select R3_refresh S1_select L2_read_unlock_threshold_table L3_unlock_cagg_table L1_unlock_threshold_table
step R1_refresh:
Expand Down Expand Up @@ -356,7 +356,6 @@ step L3_unlock_cagg_table:
ROLLBACK;

step R1_refresh: <... completed>
R2: NOTICE: continuous aggregate "cond_10" is already up-to-date
step R2_refresh: <... completed>
step S1_select:
SELECT bucket, avg_temp
Expand Down Expand Up @@ -675,3 +674,97 @@ conditions | 100
conditions2|-2147483648
(2 rows)


starting permutation: WP_enable R6_materialization_ranges R1_refresh K1_killpid R6_materialization_ranges WP_release R13_refresh R6_materialization_ranges
R5: LOG: statement:
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
SET SESSION client_min_messages = 'DEBUG1';

L1: WARNING: there is already a transaction in progress
L2: WARNING: there is already a transaction in progress
L3: WARNING: there is already a transaction in progress
step WP_enable:
SELECT debug_waitpoint_enable('after_process_cagg_invalidations_for_refresh_lock');

debug_waitpoint_enable
----------------------

(1 row)

step R6_materialization_ranges:
SELECT
c.user_view_name,
m.lowest_modified_value,
m.greatest_modified_value
FROM
_timescaledb_catalog.continuous_aggs_materialization_ranges m
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
WHERE
c.user_view_name = 'cond_10'
ORDER BY
1, 2, 3;

user_view_name|lowest_modified_value|greatest_modified_value
--------------+---------------------+-----------------------
(0 rows)

step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 25, 70);
<waiting ...>
step K1_killpid:
CALL killpids();
<waiting ...>
step R1_refresh: <... completed>
FATAL: terminating connection due to administrator command
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

step R6_materialization_ranges:
SELECT
c.user_view_name,
m.lowest_modified_value,
m.greatest_modified_value
FROM
_timescaledb_catalog.continuous_aggs_materialization_ranges m
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
WHERE
c.user_view_name = 'cond_10'
ORDER BY
1, 2, 3;

user_view_name|lowest_modified_value|greatest_modified_value
--------------+---------------------+-----------------------
cond_10 | 30| 70
(1 row)

step K1_killpid: <... completed>
step WP_release:
SELECT debug_waitpoint_release('after_process_cagg_invalidations_for_refresh_lock');

debug_waitpoint_release
-----------------------

(1 row)

step R13_refresh:
CALL refresh_continuous_aggregate('cond_10', 25, 70);

step R6_materialization_ranges:
SELECT
c.user_view_name,
m.lowest_modified_value,
m.greatest_modified_value
FROM
_timescaledb_catalog.continuous_aggs_materialization_ranges m
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
WHERE
c.user_view_name = 'cond_10'
ORDER BY
1, 2, 3;

user_view_name|lowest_modified_value|greatest_modified_value
--------------+---------------------+-----------------------
(0 rows)

38 changes: 36 additions & 2 deletions tsl/test/isolation/specs/cagg_concurrent_refresh.spec
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ setup
INTO mattable;
EXECUTE format('LOCK table %s IN ROW EXCLUSIVE MODE', mattable);
END; $$ LANGUAGE plpgsql;

CREATE TABLE killpid (
pid INTEGER NOT NULL PRIMARY KEY
);
CREATE OR REPLACE PROCEDURE killpids() AS
$$
BEGIN
PERFORM pg_terminate_backend(pid) FROM killpid;
WHILE EXISTS (SELECT FROM pg_stat_activity WHERE pid IN (SELECT pid FROM killpid))
LOOP
PERFORM pg_sleep(0.01);
END LOOP;
DELETE FROM killpid;
END;
$$ LANGUAGE plpgsql;
}

# Move the invalidation threshold so that we can generate some
Expand Down Expand Up @@ -149,6 +164,7 @@ setup
teardown {
DROP TABLE conditions CASCADE;
DROP TABLE conditions2 CASCADE;
DROP TABLE killpid;
}

# Waitpoint for cagg invalidation logs
Expand All @@ -168,6 +184,8 @@ setup
{
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
INSERT INTO killpid VALUES (pg_backend_pid())
ON CONFLICT (pid) DO NOTHING;
}
step "R1_refresh"
{
Expand All @@ -185,6 +203,12 @@ step "R12_refresh"
CALL refresh_continuous_aggregate('cond2_10', 25, 70);
}

session "R13"
step "R13_refresh"
{
CALL refresh_continuous_aggregate('cond_10', 25, 70);
}

# Refresh that overlaps with R1
session "R2"
setup
Expand Down Expand Up @@ -336,6 +360,12 @@ step "S1_select"
ORDER BY 1;
}

session "K1"
step "K1_killpid"
{
CALL killpids();
}

####################################################################
#
# Tests for concurrent updates to the invalidation threshold (first
Expand Down Expand Up @@ -385,6 +415,10 @@ permutation "L3_lock_cagg_table" "R3_refresh" "R4_refresh" "L3_unlock_cagg_table
# block each other
permutation "R1_refresh" "R12_refresh"

# CAgg invalidation logs processing skipping locks due to
# the concurrent execution
# CAgg invalidation logs processing in a separated transaction and the materialization
# transaction can be executed concurrently
permutation "WP_enable" "R1_refresh"("WP_enable") "R6_materialization_ranges" "R5_refresh"("WP_enable") "R6_materialization_ranges" "WP_release" "R6_materialization_ranges" "S1_select"

# CAgg materialization phase (third trasaction of the refresh procedure) terminated by another session and then
# refreshing again and make sure the pending ranges will be processed
permutation "WP_enable" "R6_materialization_ranges" "R1_refresh"("WP_enable") "K1_killpid"("R1_refresh") "R6_materialization_ranges" "WP_release" "R13_refresh"("K1_killpid") "R6_materialization_ranges"
Loading