Skip to content

Commit 8956d9e

Browse files
committed
Add changelog entries
1 parent 165f411 commit 8956d9e

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

Changelog.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ This changelog should be updated as part of a PR if the work is worth noting (mo
1111
#### Bug fixes
1212

1313
#### Other notable changes
14+
- [#692](https://github.com/timescale/timescaledb-toolkit/pull/692): Support specifying a range to `duration_in` to specify a time range to get states in for state aggregates
15+
- [#692](https://github.com/timescale/timescaledb-toolkit/pull/692): Removed `next` parameter from interpolated state aggregate functions
16+
- [#692](https://github.com/timescale/timescaledb-toolkit/pull/692): Renamed `state_agg` to `compressed_state_agg` and `timeline_agg` to `state_agg`
1417

1518
#### Shout-outs
1619

extension/src/state_aggregate.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ pub mod toolkit_experimental {
306306

307307
// update combined_durations
308308
if let Some(combined_durations) = combined_durations.as_mut() {
309+
warning!("updating combined durations");
309310
// extend last duration
310311
let first_cd = combined_durations
311312
.first_mut()
@@ -345,15 +346,15 @@ pub mod toolkit_experimental {
345346
let last_interval = interval_start + interval_len - self.last_time;
346347
match durations.get_mut(self.last_state as usize) {
347348
None => {
348-
pgx::error!("poorly formed CompressedStateAgg, last_state out of starts")
349+
pgx::error!("poorly formed state aggregate, last_state out of starts")
349350
}
350351
Some(dis) => {
351352
dis.duration += last_interval;
352353
if let Some(combined_durations) = combined_durations.as_mut() {
353354
// extend last duration
354355
combined_durations
355356
.last_mut()
356-
.expect("poorly formed StateAgg, length mismatch")
357+
.expect("poorly formed state aggregate, length mismatch")
357358
.end_time += last_interval;
358359
};
359360
Record {
@@ -738,23 +739,24 @@ impl CompressedStateAggTransState {
738739
fn duration_in_inner<'a>(
739740
state: Option<StateEntry>,
740741
aggregate: Option<CompressedStateAgg<'a>>,
741-
range: Option<(TimestampTz, Option<crate::raw::Interval>)>,
742+
range: Option<(i64, Option<i64>)>, // start and interval
742743
) -> crate::raw::Interval {
744+
warning!("duration_in_inner - range={:?}", range);
743745
let time: i64 = if let Some((start, interval)) = range {
744-
let (start, end) = if let Some(interval) = interval {
745-
let interval = crate::datum_utils::interval_to_ms(&start, &interval);
746+
let end = if let Some(interval) = interval {
746747
assert!(interval >= 0, "Interval must not be negative");
747-
let start = start.into();
748-
(start, start + interval)
748+
start + interval
749749
} else {
750-
(start.into(), i64::MAX)
750+
i64::MAX
751751
};
752752
assert!(end >= start, "End time must be after start time");
753753
if let (Some(state), Some(agg)) = (state, aggregate) {
754754
let state = state.materialize(agg.states_as_str());
755755
let mut total = 0;
756756
for tis in agg.combined_durations.iter() {
757+
warning!("evaluating TIS - range={:?}, tis={:?}", range, tis);
757758
if tis.state.materialize(agg.states_as_str()) == state {
759+
warning!("evaluating matching TIS - range={:?}, tis={:?}", range, tis);
758760
let tis_start_time = i64::max(tis.start_time, start);
759761
let tis_end_time = i64::min(tis.end_time, end);
760762
if tis_end_time >= start && tis_start_time <= end {
@@ -872,6 +874,8 @@ pub fn duration_in_range<'a>(
872874
aggregate.assert_str()
873875
};
874876
let aggregate = aggregate.map(StateAgg::as_compressed_state_agg);
877+
let interval = interval.map(|interval| crate::datum_utils::interval_to_ms(&start, &interval));
878+
let start = start.into();
875879
duration_in_inner(
876880
aggregate.as_ref().and_then(|aggregate| {
877881
StateEntry::try_from_existing_str(aggregate.states_as_str(), &state)
@@ -896,6 +900,8 @@ pub fn duration_in_range_int<'a>(
896900
if let Some(ref aggregate) = aggregate {
897901
aggregate.assert_int()
898902
};
903+
let interval = interval.map(|interval| crate::datum_utils::interval_to_ms(&start, &interval));
904+
let start = start.into();
899905
duration_in_inner(
900906
Some(StateEntry::from_integer(state)),
901907
aggregate.map(StateAgg::as_compressed_state_agg),
@@ -916,10 +922,11 @@ fn interpolated_duration_in_inner<'a>(
916922
),
917923
Some(aggregate) => {
918924
let interval = crate::datum_utils::interval_to_ms(&start, &interval);
919-
let new_agg = aggregate.interpolate(start.into(), interval, prev);
925+
let start = start.into();
926+
let new_agg = aggregate.interpolate(start, interval, prev);
920927
let state_entry =
921928
state.and_then(|state| state.try_existing_entry(new_agg.states_as_str()));
922-
duration_in_inner(state_entry, Some(new_agg), None)
929+
duration_in_inner(state_entry, Some(new_agg), Some((start, Some(interval))))
923930
}
924931
}
925932
}

0 commit comments

Comments
 (0)