Skip to content

Commit 8bf1d9d

Browse files
committed
Draft PR
1 parent 345a0f0 commit 8bf1d9d

File tree

5 files changed

+226
-8
lines changed

5 files changed

+226
-8
lines changed

tsl/src/continuous_aggs/materialize.c

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ typedef enum MaterializationPlanType
5252
PLAN_TYPE_MERGE_DELETE,
5353
PLAN_TYPE_RANGES_SELECT,
5454
PLAN_TYPE_RANGES_DELETE,
55+
PLAN_TYPE_RANGES_PENDING,
5556
_MAX_MATERIALIZATION_PLAN_TYPES
5657
} MaterializationPlanType;
5758

@@ -91,13 +92,15 @@ static char *create_materialization_merge_statement(MaterializationContext *cont
9192
static char *create_materialization_merge_delete_statement(MaterializationContext *context);
9293
static char *create_materialization_ranges_select_statement(MaterializationContext *context);
9394
static char *create_materialization_ranges_delete_statement(MaterializationContext *context);
95+
static char *create_materialization_ranges_pending_statement(MaterializationContext *context);
9496

9597
static void emit_materialization_insert_error(MaterializationContext *context);
9698
static void emit_materialization_delete_error(MaterializationContext *context);
9799
static void emit_materialization_exists_error(MaterializationContext *context);
98100
static void emit_materialization_merge_error(MaterializationContext *context);
99101
static void emit_materialization_ranges_select_error(MaterializationContext *context);
100102
static void emit_materialization_ranges_delete_error(MaterializationContext *context);
103+
static void emit_materialization_ranges_pending_error(MaterializationContext *context);
101104

102105
static void emit_materialization_insert_progress(MaterializationContext *context,
103106
uint64 rows_processed);
@@ -137,6 +140,11 @@ static MaterializationPlan materialization_plans[_MAX_MATERIALIZATION_PLAN_TYPES
137140
.create_statement =
138141
create_materialization_ranges_delete_statement,
139142
.emit_error = emit_materialization_ranges_delete_error },
143+
[PLAN_TYPE_RANGES_PENDING] = { .read_only = true,
144+
.nargs = 3,
145+
.create_statement =
146+
create_materialization_ranges_pending_statement,
147+
.emit_error = emit_materialization_ranges_pending_error },
140148
};
141149

142150
static Oid *create_materialization_plan_argtypes(MaterializationContext *context,
@@ -201,6 +209,32 @@ continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *c
201209
AtEOXact_GUC(false, save_nestlevel);
202210
}
203211

212+
/* API to check for pending materialization ranges */
213+
bool
214+
continuous_agg_pending_materializations(const ContinuousAgg *cagg,
215+
InternalTimeRange materialization_range)
216+
{
217+
MaterializationContext context = {
218+
.cagg = cagg,
219+
.internal_materialization_range = materialization_range,
220+
};
221+
222+
/* Lock down search_path */
223+
int save_nestlevel = NewGUCNestLevel();
224+
RestrictSearchPath();
225+
226+
if (materialization_range.start > materialization_range.end)
227+
materialization_range.start = materialization_range.end;
228+
229+
bool has_pending_materializations =
230+
(execute_materialization_plan(&context, PLAN_TYPE_RANGES_PENDING) > 0);
231+
232+
/* Restore search_path */
233+
AtEOXact_GUC(false, save_nestlevel);
234+
235+
return has_pending_materializations;
236+
}
237+
204238
static Datum
205239
time_range_internal_to_min_time_value(Oid type)
206240
{
@@ -558,6 +592,23 @@ create_materialization_ranges_delete_statement(MaterializationContext *context)
558592
return query.data;
559593
}
560594

595+
static char *
596+
create_materialization_ranges_pending_statement(MaterializationContext *context)
597+
{
598+
StringInfoData query;
599+
initStringInfo(&query);
600+
601+
appendStringInfo(&query,
602+
"SELECT * "
603+
"FROM _timescaledb_catalog.continuous_aggs_materialization_ranges "
604+
"WHERE materialization_id = $1 "
605+
"AND pg_catalog.int8range(lowest_modified_value, greatest_modified_value) && "
606+
"pg_catalog.int8range($2, $3) "
607+
"LIMIT 1 ");
608+
609+
return query.data;
610+
}
611+
561612
static void
562613
emit_materialization_insert_error(MaterializationContext *context)
563614
{
@@ -612,6 +663,15 @@ emit_materialization_ranges_delete_error(MaterializationContext *context)
612663
NameStr(*context->materialization_table.name));
613664
}
614665

666+
static void
667+
emit_materialization_ranges_pending_error(MaterializationContext *context)
668+
{
669+
elog(ERROR,
670+
"could not select pending materialization ranges \"%s.%s\"",
671+
NameStr(*context->materialization_table.schema),
672+
NameStr(*context->materialization_table.name));
673+
}
674+
615675
static void
616676
emit_materialization_insert_progress(MaterializationContext *context, uint64 rows_processed)
617677
{
@@ -651,7 +711,8 @@ create_materialization_plan_argtypes(MaterializationContext *context,
651711
switch (plan_type)
652712
{
653713
case PLAN_TYPE_RANGES_SELECT: /* 3 arguments */
654-
argtypes[0] = INT4OID; /* materialization_id */
714+
case PLAN_TYPE_RANGES_PENDING:
715+
argtypes[0] = INT4OID; /* materialization_id */
655716
argtypes[1] = INT8OID;
656717
argtypes[2] = INT8OID;
657718
break;
@@ -703,6 +764,7 @@ create_materialization_plan_args(MaterializationContext *context, Materializatio
703764
switch (plan_type)
704765
{
705766
case PLAN_TYPE_RANGES_SELECT: /* 3 arguments */
767+
case PLAN_TYPE_RANGES_PENDING:
706768
{
707769
(*values)[0] = Int32GetDatum(context->cagg->data.mat_hypertable_id);
708770
(*values)[1] = Int64GetDatum(context->internal_materialization_range.start);

tsl/src/continuous_aggs/materialize.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,5 @@ void continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousA
4242
SchemaAndName materialization_table,
4343
const NameData *time_column_name,
4444
InternalTimeRange materialization_range, int32 chunk_id);
45+
bool continuous_agg_pending_materializations(const ContinuousAgg *cagg,
46+
InternalTimeRange materialization_range);

tsl/src/continuous_aggs/refresh.c

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -919,11 +919,40 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
919919

920920
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id, false);
921921

922-
if (!process_cagg_invalidations_and_refresh(cagg,
923-
&refresh_window,
924-
context,
925-
INVALID_CHUNK_ID,
926-
force))
922+
bool refreshed = process_cagg_invalidations_and_refresh(cagg,
923+
&refresh_window,
924+
context,
925+
INVALID_CHUNK_ID,
926+
force);
927+
928+
bool has_pending_materializations =
929+
continuous_agg_pending_materializations(cagg, refresh_window);
930+
931+
if (!refreshed && !has_pending_materializations)
932+
emit_up_to_date_notice(cagg, context);
933+
934+
if (has_pending_materializations)
935+
{
936+
ContinuousAggRefreshState refresh;
937+
continuous_agg_refresh_init(&refresh, cagg, &refresh_window);
938+
939+
InternalTimeRange invalidation = {
940+
.type = refresh_window.type,
941+
.start = DatumGetInt64(refresh_window.start),
942+
/* Invalidations are inclusive at the end, while refresh windows
943+
* aren't, so add one to the end of the invalidated region */
944+
.end =
945+
ts_time_saturating_add(DatumGetInt64(refresh_window.end), 1, refresh_window.type),
946+
};
947+
948+
InternalTimeRange bucketed_refresh_window =
949+
compute_circumscribed_bucketed_refresh_window(cagg,
950+
&invalidation,
951+
cagg->bucket_function);
952+
953+
continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, INVALID_CHUNK_ID);
954+
}
955+
else
927956
emit_up_to_date_notice(cagg, context);
928957

929958
/* Restore search_path */

tsl/test/isolation/expected/cagg_concurrent_refresh.out

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Parsed test spec with 12 sessions
1+
Parsed test spec with 14 sessions
22

33
starting permutation: R1_refresh S1_select R3_refresh S1_select L2_read_unlock_threshold_table L3_unlock_cagg_table L1_unlock_threshold_table
44
step R1_refresh:
@@ -356,7 +356,6 @@ step L3_unlock_cagg_table:
356356
ROLLBACK;
357357

358358
step R1_refresh: <... completed>
359-
R2: NOTICE: continuous aggregate "cond_10" is already up-to-date
360359
step R2_refresh: <... completed>
361360
step S1_select:
362361
SELECT bucket, avg_temp
@@ -675,3 +674,97 @@ conditions | 100
675674
conditions2|-2147483648
676675
(2 rows)
677676

677+
678+
starting permutation: WP_enable R6_materialization_ranges R1_refresh K1_killpid R6_materialization_ranges WP_release R13_refresh R6_materialization_ranges
679+
R5: LOG: statement:
680+
SET SESSION lock_timeout = '500ms';
681+
SET SESSION deadlock_timeout = '500ms';
682+
SET SESSION client_min_messages = 'DEBUG1';
683+
684+
L1: WARNING: there is already a transaction in progress
685+
L2: WARNING: there is already a transaction in progress
686+
L3: WARNING: there is already a transaction in progress
687+
step WP_enable:
688+
SELECT debug_waitpoint_enable('after_process_cagg_invalidations_for_refresh_lock');
689+
690+
debug_waitpoint_enable
691+
----------------------
692+
693+
(1 row)
694+
695+
step R6_materialization_ranges:
696+
SELECT
697+
c.user_view_name,
698+
m.lowest_modified_value,
699+
m.greatest_modified_value
700+
FROM
701+
_timescaledb_catalog.continuous_aggs_materialization_ranges m
702+
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
703+
WHERE
704+
c.user_view_name = 'cond_10'
705+
ORDER BY
706+
1, 2, 3;
707+
708+
user_view_name|lowest_modified_value|greatest_modified_value
709+
--------------+---------------------+-----------------------
710+
(0 rows)
711+
712+
step R1_refresh:
713+
CALL refresh_continuous_aggregate('cond_10', 25, 70);
714+
<waiting ...>
715+
step K1_killpid:
716+
CALL killpids();
717+
<waiting ...>
718+
step R1_refresh: <... completed>
719+
FATAL: terminating connection due to administrator command
720+
server closed the connection unexpectedly
721+
This probably means the server terminated abnormally
722+
before or while processing the request.
723+
724+
step R6_materialization_ranges:
725+
SELECT
726+
c.user_view_name,
727+
m.lowest_modified_value,
728+
m.greatest_modified_value
729+
FROM
730+
_timescaledb_catalog.continuous_aggs_materialization_ranges m
731+
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
732+
WHERE
733+
c.user_view_name = 'cond_10'
734+
ORDER BY
735+
1, 2, 3;
736+
737+
user_view_name|lowest_modified_value|greatest_modified_value
738+
--------------+---------------------+-----------------------
739+
cond_10 | 30| 70
740+
(1 row)
741+
742+
step K1_killpid: <... completed>
743+
step WP_release:
744+
SELECT debug_waitpoint_release('after_process_cagg_invalidations_for_refresh_lock');
745+
746+
debug_waitpoint_release
747+
-----------------------
748+
749+
(1 row)
750+
751+
step R13_refresh:
752+
CALL refresh_continuous_aggregate('cond_10', 25, 70);
753+
754+
step R6_materialization_ranges:
755+
SELECT
756+
c.user_view_name,
757+
m.lowest_modified_value,
758+
m.greatest_modified_value
759+
FROM
760+
_timescaledb_catalog.continuous_aggs_materialization_ranges m
761+
JOIN _timescaledb_catalog.continuous_agg c on c.mat_hypertable_id = m.materialization_id
762+
WHERE
763+
c.user_view_name = 'cond_10'
764+
ORDER BY
765+
1, 2, 3;
766+
767+
user_view_name|lowest_modified_value|greatest_modified_value
768+
--------------+---------------------+-----------------------
769+
(0 rows)
770+

tsl/test/isolation/specs/cagg_concurrent_refresh.spec

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,21 @@ setup
115115
INTO mattable;
116116
EXECUTE format('LOCK table %s IN ROW EXCLUSIVE MODE', mattable);
117117
END; $$ LANGUAGE plpgsql;
118+
119+
CREATE TABLE killpid (
120+
pid INTEGER NOT NULL PRIMARY KEY
121+
);
122+
CREATE OR REPLACE PROCEDURE killpids() AS
123+
$$
124+
BEGIN
125+
PERFORM pg_terminate_backend(pid) FROM killpid;
126+
WHILE EXISTS (SELECT FROM pg_stat_activity WHERE pid IN (SELECT pid FROM killpid))
127+
LOOP
128+
PERFORM pg_sleep(0.01);
129+
END LOOP;
130+
DELETE FROM killpid;
131+
END;
132+
$$ LANGUAGE plpgsql;
118133
}
119134

120135
# Move the invalidation threshold so that we can generate some
@@ -149,6 +164,7 @@ setup
149164
teardown {
150165
DROP TABLE conditions CASCADE;
151166
DROP TABLE conditions2 CASCADE;
167+
DROP TABLE killpid;
152168
}
153169

154170
# Waitpoint for cagg invalidation logs
@@ -168,6 +184,8 @@ setup
168184
{
169185
SET SESSION lock_timeout = '500ms';
170186
SET SESSION deadlock_timeout = '500ms';
187+
INSERT INTO killpid VALUES (pg_backend_pid())
188+
ON CONFLICT (pid) DO NOTHING;
171189
}
172190
step "R1_refresh"
173191
{
@@ -185,6 +203,12 @@ step "R12_refresh"
185203
CALL refresh_continuous_aggregate('cond2_10', 25, 70);
186204
}
187205

206+
session "R13"
207+
step "R13_refresh"
208+
{
209+
CALL refresh_continuous_aggregate('cond_10', 25, 70);
210+
}
211+
188212
# Refresh that overlaps with R1
189213
session "R2"
190214
setup
@@ -336,6 +360,12 @@ step "S1_select"
336360
ORDER BY 1;
337361
}
338362

363+
session "K1"
364+
step "K1_killpid"
365+
{
366+
CALL killpids();
367+
}
368+
339369
####################################################################
340370
#
341371
# Tests for concurrent updates to the invalidation threshold (first
@@ -388,3 +418,5 @@ permutation "R1_refresh" "R12_refresh"
388418
# CAgg invalidation logs processing skipping locks due to
389419
# the concurrent execution
390420
permutation "WP_enable" "R1_refresh"("WP_enable") "R6_materialization_ranges" "R5_refresh"("WP_enable") "R6_materialization_ranges" "WP_release" "R6_materialization_ranges" "S1_select"
421+
422+
permutation "WP_enable" "R6_materialization_ranges" "R1_refresh"("WP_enable") "K1_killpid"("R1_refresh") "R6_materialization_ranges" "WP_release" "R13_refresh"("K1_killpid") "R6_materialization_ranges"

0 commit comments

Comments
 (0)