Skip to content

Commit 3fdd739

Browse files
committed
Remove merged Continuous Aggregate materialization
Historically if we have more than `timescaledb.materializations_per_refresh_window` we merge those ranges into a merged range based on the `min-start` and `max-end` even if there's holes in between that can potentially lead to unnecessary re-materializations in the underlying materialization hypertable. Now with new features like incremental and/or concurrent policy refresh this implementation is useless so let's remove it to keep the code base simpler and avoid unnecessary re-materializations.
1 parent 16dd264 commit 3fdd739

File tree

6 files changed

+38
-137
lines changed

6 files changed

+38
-137
lines changed

tsl/src/continuous_aggs/invalidation.c

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,16 +1067,13 @@ invalidation_process_hypertable_log(int32 hypertable_id, Oid dimtype)
10671067

10681068
InvalidationStore *
10691069
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
1070-
long max_materializations, bool *do_merged_refresh,
1071-
InternalTimeRange *ret_merged_refresh_window,
1072-
ContinuousAggRefreshContext context, bool force)
1070+
long max_materializations, ContinuousAggRefreshContext context,
1071+
bool force)
10731072
{
10741073
ContinuousAggInvalidationState state;
10751074
InvalidationStore *store = NULL;
10761075
long count;
10771076

1078-
*do_merged_refresh = false;
1079-
10801077
cagg_invalidation_state_init(&state, cagg);
10811078
state.invalidations = tuplestore_begin_heap(false, false, work_mem);
10821079
clear_cagg_invalidations_for_refresh(&state, refresh_window, force);
@@ -1095,25 +1092,6 @@ invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange
10951092

10961093
cagg_invalidation_state_cleanup(&state);
10971094

1098-
/*
1099-
* If there are many individual invalidation ranges to refresh, then
1100-
* revert to a merged refresh across the range decided by lowest and
1101-
* highest invalidated value.
1102-
*/
1103-
if (count && tuplestore_tuple_count(store->tupstore) > max_materializations)
1104-
{
1105-
InternalTimeRange merged_refresh_window;
1106-
continuous_agg_calculate_merged_refresh_window(cagg,
1107-
refresh_window,
1108-
store,
1109-
&merged_refresh_window,
1110-
context);
1111-
*do_merged_refresh = true;
1112-
*ret_merged_refresh_window = merged_refresh_window;
1113-
invalidation_store_free(store);
1114-
store = NULL;
1115-
}
1116-
11171095
return store;
11181096
}
11191097

tsl/src/continuous_aggs/invalidation.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ extern void continuous_agg_invalidate_mat_ht(const Hypertable *raw_ht, const Hyp
4444
extern Datum continuous_agg_process_hypertable_invalidations(PG_FUNCTION_ARGS);
4545
extern void invalidation_process_hypertable_log(int32 hypertable_id, Oid dimtype);
4646

47-
extern InvalidationStore *
48-
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
49-
long max_materializations, bool *do_merged_refresh,
50-
InternalTimeRange *ret_merged_refresh_window,
51-
ContinuousAggRefreshContext context, bool force);
47+
extern InvalidationStore *invalidation_process_cagg_log(const ContinuousAgg *cagg,
48+
const InternalTimeRange *refresh_window,
49+
long max_materializations,
50+
ContinuousAggRefreshContext context,
51+
bool force);
5252

5353
extern void invalidation_store_free(InvalidationStore *store);
5454
extern void

tsl/src/continuous_aggs/refresh.c

Lines changed: 16 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -65,21 +65,16 @@ static void continuous_agg_refresh_execute(const ContinuousAggRefreshState *refr
6565
const InternalTimeRange *bucketed_refresh_window,
6666
const int32 chunk_id);
6767
static void log_refresh_window(int elevel, const ContinuousAgg *cagg,
68-
const InternalTimeRange *refresh_window, const char *msg,
68+
const InternalTimeRange *refresh_window,
6969
ContinuousAggRefreshContext context);
7070
static void continuous_agg_refresh_execute_wrapper(const InternalTimeRange *bucketed_refresh_window,
7171
const ContinuousAggRefreshContext context,
7272
const long iteration, void *arg1_refresh,
7373
void *arg2_chunk_id);
74-
static void update_merged_refresh_window(const InternalTimeRange *bucketed_refresh_window,
75-
const ContinuousAggRefreshContext context,
76-
const long iteration, void *arg1_merged_refresh_window,
77-
void *arg2);
7874
static void continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
7975
const InternalTimeRange *refresh_window,
8076
const InvalidationStore *invalidations,
81-
int32 chunk_id, const bool do_merged_refresh,
82-
const InternalTimeRange merged_refresh_window,
77+
int32 chunk_id,
8378
const ContinuousAggRefreshContext context);
8479
static void emit_up_to_date_notice(const ContinuousAgg *cagg,
8580
const ContinuousAggRefreshContext context);
@@ -428,8 +423,9 @@ continuous_agg_refresh_execute(const ContinuousAggRefreshState *refresh,
428423

429424
static void
430425
log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
431-
const char *msg, ContinuousAggRefreshContext context)
426+
ContinuousAggRefreshContext context)
432427
{
428+
const char *msg = "continuous aggregate refresh (individual invalidation) on";
433429
if (context.callctx == CAGG_REFRESH_POLICY_BATCHED)
434430
elog(elevel,
435431
"%s \"%s\" in window [ %s, %s ] (batch %d of %d)",
@@ -463,34 +459,10 @@ continuous_agg_refresh_execute_wrapper(const InternalTimeRange *bucketed_refresh
463459
const int32 chunk_id = *(const int32 *) arg2_chunk_id;
464460
(void) iteration;
465461

466-
log_refresh_window(CAGG_REFRESH_LOG_LEVEL,
467-
&refresh->cagg,
468-
bucketed_refresh_window,
469-
"continuous aggregate refresh (individual invalidation) on",
470-
context);
462+
log_refresh_window(CAGG_REFRESH_LOG_LEVEL, &refresh->cagg, bucketed_refresh_window, context);
471463
continuous_agg_refresh_execute(refresh, bucketed_refresh_window, chunk_id);
472464
}
473465

474-
static void
475-
update_merged_refresh_window(const InternalTimeRange *bucketed_refresh_window,
476-
const ContinuousAggRefreshContext context, const long iteration,
477-
void *arg1_merged_refresh_window, void *arg2)
478-
{
479-
InternalTimeRange *merged_refresh_window = (InternalTimeRange *) arg1_merged_refresh_window;
480-
(void) arg2;
481-
482-
if (iteration == 0)
483-
*merged_refresh_window = *bucketed_refresh_window;
484-
else
485-
{
486-
if (bucketed_refresh_window->start < merged_refresh_window->start)
487-
merged_refresh_window->start = bucketed_refresh_window->start;
488-
489-
if (bucketed_refresh_window->end > merged_refresh_window->end)
490-
merged_refresh_window->end = bucketed_refresh_window->end;
491-
}
492-
}
493-
494466
static long
495467
continuous_agg_scan_refresh_window_ranges(const ContinuousAgg *cagg,
496468
const InternalTimeRange *refresh_window,
@@ -572,8 +544,6 @@ static void
572544
continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
573545
const InternalTimeRange *refresh_window,
574546
const InvalidationStore *invalidations, int32 chunk_id,
575-
const bool do_merged_refresh,
576-
const InternalTimeRange merged_refresh_window,
577547
const ContinuousAggRefreshContext context)
578548
{
579549
ContinuousAggRefreshState refresh;
@@ -593,34 +563,15 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
593563
if (ContinuousAggIsFinalized(cagg))
594564
chunk_id = INVALID_CHUNK_ID;
595565

596-
if (do_merged_refresh)
597-
{
598-
Assert(merged_refresh_window.type == refresh_window->type);
599-
Assert(merged_refresh_window.start >= refresh_window->start);
600-
Assert((cagg->bucket_function->bucket_fixed_interval == false) ||
601-
(merged_refresh_window.end -
602-
ts_continuous_agg_fixed_bucket_width(cagg->bucket_function) <=
603-
refresh_window->end));
604-
605-
log_refresh_window(CAGG_REFRESH_LOG_LEVEL,
606-
cagg,
607-
&merged_refresh_window,
608-
"continuous aggregate refresh (merged invalidation) on",
609-
context);
610-
continuous_agg_refresh_execute(&refresh, &merged_refresh_window, chunk_id);
611-
}
612-
else
613-
{
614-
long count pg_attribute_unused();
615-
count = continuous_agg_scan_refresh_window_ranges(cagg,
616-
refresh_window,
617-
invalidations,
618-
context,
619-
continuous_agg_refresh_execute_wrapper,
620-
(void *) &refresh /* arg1 */,
621-
(void *) &chunk_id /* arg2 */);
622-
Assert(count);
623-
}
566+
long count pg_attribute_unused();
567+
count = continuous_agg_scan_refresh_window_ranges(cagg,
568+
refresh_window,
569+
invalidations,
570+
context,
571+
continuous_agg_refresh_execute_wrapper,
572+
(void *) &refresh /* arg1 */,
573+
(void *) &chunk_id /* arg2 */);
574+
Assert(count);
624575
}
625576

626577
#define REFRESH_FUNCTION_NAME "refresh_continuous_aggregate()"
@@ -701,24 +652,6 @@ emit_up_to_date_notice(const ContinuousAgg *cagg, const ContinuousAggRefreshCont
701652
}
702653
}
703654

704-
void
705-
continuous_agg_calculate_merged_refresh_window(const ContinuousAgg *cagg,
706-
const InternalTimeRange *refresh_window,
707-
const InvalidationStore *invalidations,
708-
InternalTimeRange *merged_refresh_window,
709-
const ContinuousAggRefreshContext context)
710-
{
711-
long count pg_attribute_unused();
712-
count = continuous_agg_scan_refresh_window_ranges(cagg,
713-
refresh_window,
714-
invalidations,
715-
context,
716-
update_merged_refresh_window,
717-
(void *) merged_refresh_window,
718-
NULL /* arg2 */);
719-
Assert(count);
720-
}
721-
722655
static bool
723656
process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
724657
const InternalTimeRange *refresh_window,
@@ -727,8 +660,6 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
727660
{
728661
InvalidationStore *invalidations;
729662
Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id, false);
730-
bool do_merged_refresh = false;
731-
InternalTimeRange merged_refresh_window;
732663

733664
/* Lock the continuous aggregate's materialized hypertable to protect
734665
* against concurrent refreshes. Only concurrent reads will be
@@ -742,12 +673,10 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
742673
invalidations = invalidation_process_cagg_log(cagg,
743674
refresh_window,
744675
ts_guc_cagg_max_individual_materializations,
745-
&do_merged_refresh,
746-
&merged_refresh_window,
747676
context,
748677
force);
749678

750-
if (invalidations != NULL || do_merged_refresh)
679+
if (invalidations != NULL)
751680
{
752681
if (context.callctx == CAGG_REFRESH_CREATION)
753682
{
@@ -758,13 +687,7 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
758687
"aggregate on creation.")));
759688
}
760689

761-
continuous_agg_refresh_with_window(cagg,
762-
refresh_window,
763-
invalidations,
764-
chunk_id,
765-
do_merged_refresh,
766-
merged_refresh_window,
767-
context);
690+
continuous_agg_refresh_with_window(cagg, refresh_window, invalidations, chunk_id, context);
768691
if (invalidations)
769692
invalidation_store_free(invalidations);
770693
return true;

tsl/test/expected/cagg_bgw-15.out

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -573,19 +573,19 @@ SELECT * FROM sorted_bgw_log;
573573
6 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
574574
0 | 86400000000 | DB Scheduler | [TESTING] Registered new background worker
575575
1 | 86400000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
576-
0 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 8, 10 ] (batch 1 of 5)
576+
0 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 8, 10 ] (batch 1 of 5)
577577
1 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
578578
2 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
579-
3 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 6, 8 ] (batch 2 of 5)
579+
3 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 6, 8 ] (batch 2 of 5)
580580
4 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
581581
5 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
582-
6 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 4, 6 ] (batch 3 of 5)
582+
6 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 4, 6 ] (batch 3 of 5)
583583
7 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
584584
8 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
585-
9 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 2, 4 ] (batch 4 of 5)
585+
9 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 2, 4 ] (batch 4 of 5)
586586
10 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
587587
11 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
588-
12 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 0, 2 ] (batch 5 of 5)
588+
12 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 0, 2 ] (batch 5 of 5)
589589
13 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
590590
14 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
591591
(46 rows)

tsl/test/expected/cagg_bgw-16.out

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -573,19 +573,19 @@ SELECT * FROM sorted_bgw_log;
573573
6 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
574574
0 | 86400000000 | DB Scheduler | [TESTING] Registered new background worker
575575
1 | 86400000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
576-
0 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 8, 10 ] (batch 1 of 5)
576+
0 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 8, 10 ] (batch 1 of 5)
577577
1 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
578578
2 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
579-
3 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 6, 8 ] (batch 2 of 5)
579+
3 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 6, 8 ] (batch 2 of 5)
580580
4 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
581581
5 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
582-
6 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 4, 6 ] (batch 3 of 5)
582+
6 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 4, 6 ] (batch 3 of 5)
583583
7 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
584584
8 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
585-
9 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 2, 4 ] (batch 4 of 5)
585+
9 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 2, 4 ] (batch 4 of 5)
586586
10 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
587587
11 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
588-
12 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 0, 2 ] (batch 5 of 5)
588+
12 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 0, 2 ] (batch 5 of 5)
589589
13 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
590590
14 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
591591
(46 rows)

0 commit comments

Comments
 (0)