Skip to content
51 changes: 42 additions & 9 deletions extension/src/state_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ pub mod toolkit_experimental {
state: self.durations.as_slice()[self.first_state as usize]
.state
.materialize(&states),
time: self.first_time,
time: interval_start,
},
};

Expand Down Expand Up @@ -738,19 +738,28 @@ impl CompressedStateAggTransState {
fn duration_in_inner<'a>(
state: Option<StateEntry>,
aggregate: Option<CompressedStateAgg<'a>>,
range: Option<(TimestampTz, Option<crate::raw::Interval>)>,
range: Option<(i64, Option<i64>)>, // start and interval
) -> crate::raw::Interval {
let time: i64 = if let Some((start, interval)) = range {
let (start, end) = if let Some(interval) = interval {
let interval = crate::datum_utils::interval_to_ms(&start, &interval);
let end = if let Some(interval) = interval {
assert!(interval >= 0, "Interval must not be negative");
let start = start.into();
(start, start + interval)
start + interval
} else {
(start.into(), i64::MAX)
i64::MAX
};
assert!(end >= start, "End time must be after start time");
if let (Some(state), Some(agg)) = (state, aggregate) {
assert!(
!agg.0.compressed,
"unreachable: interval specified for compressed aggregate"
);
assert!(
start >= agg.0.first_time,
"Start time ({}) cannot be before first state ({})",
start,
agg.0.first_time,
);

let state = state.materialize(agg.states_as_str());
let mut total = 0;
for tis in agg.combined_durations.iter() {
Expand Down Expand Up @@ -872,6 +881,8 @@ pub fn duration_in_range<'a>(
aggregate.assert_str()
};
let aggregate = aggregate.map(StateAgg::as_compressed_state_agg);
let interval = interval.map(|interval| crate::datum_utils::interval_to_ms(&start, &interval));
let start = start.into();
duration_in_inner(
aggregate.as_ref().and_then(|aggregate| {
StateEntry::try_from_existing_str(aggregate.states_as_str(), &state)
Expand All @@ -896,6 +907,8 @@ pub fn duration_in_range_int<'a>(
if let Some(ref aggregate) = aggregate {
aggregate.assert_int()
};
let interval = interval.map(|interval| crate::datum_utils::interval_to_ms(&start, &interval));
let start = start.into();
duration_in_inner(
Some(StateEntry::from_integer(state)),
aggregate.map(StateAgg::as_compressed_state_agg),
Expand All @@ -916,10 +929,30 @@ fn interpolated_duration_in_inner<'a>(
),
Some(aggregate) => {
let interval = crate::datum_utils::interval_to_ms(&start, &interval);
let new_agg = aggregate.interpolate(start.into(), interval, prev);
let start = start.into();
if let Some(ref prev) = prev {
assert!(
start >= prev.0.last_time,
"Start time cannot be before last state of previous aggregate"
);
};
let range = if aggregate.compressed {
assert!(
start <= aggregate.first_time,
"For compressed state aggregates, the start cannot be after the first state"
);
assert!(
(start + interval) >= aggregate.last_time,
"For compressed state aggregates, the time range cannot be after the last state"
);
None
} else {
Some((start, Some(interval)))
};
let new_agg = aggregate.interpolate(start, interval, prev);
let state_entry =
state.and_then(|state| state.try_existing_entry(new_agg.states_as_str()));
duration_in_inner(state_entry, Some(new_agg), None)
duration_in_inner(state_entry, Some(new_agg), range)
}
}
}
Expand Down