Skip to content

Commit c627db5

Browse files
committed
Support getting the state duration for a range
1 parent 45cba4f commit c627db5

File tree

2 files changed

+110
-3
lines changed

2 files changed

+110
-3
lines changed

docs/state_agg.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,40 @@ FROM states_test;
7777
3
7878
```
7979

80+
#### duration_in for a range
81+
```SQL
82+
SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.timeline_agg(ts, state), '2020-01-01 00:01:00+00', '2020-01-03 00:01:00+00') FROM states_test;
83+
```
84+
```output
85+
duration_in
86+
-------------
87+
00:00:57
88+
```
89+
```SQL
90+
SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.timeline_agg(ts, state), '2020-01-01 00:01:00+00') FROM states_test;
91+
```
92+
```output
93+
duration_in
94+
-------------
95+
00:00:57
96+
```
97+
```SQL
98+
SELECT toolkit_experimental.duration_in(51351, toolkit_experimental.timeline_agg(ts, state), '2020-01-01 00:01:00+00', '2020-01-03 00:01:00+00') FROM states_test_4;
99+
```
100+
```output
101+
duration_in
102+
-------------
103+
00:00:57
104+
```
105+
```SQL
106+
SELECT toolkit_experimental.duration_in(51351, toolkit_experimental.timeline_agg(ts, state), '2020-01-01 00:01:00+00') FROM states_test_4;
107+
```
108+
```output
109+
duration_in
110+
-------------
111+
00:00:57
112+
```
113+
80114
### into_values
81115

82116
```SQL

extension/src/state_aggregate.rs

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -737,8 +737,32 @@ impl StateAggTransState {
737737
fn duration_in_inner<'a>(
738738
state: Option<StateEntry>,
739739
aggregate: Option<StateAgg<'a>>,
740+
range: Option<(TimestampTz, TimestampTz)>,
740741
) -> crate::raw::Interval {
741-
let time: i64 = state.and_then(|state| aggregate?.get(state)).unwrap_or(0);
742+
let time: i64 = if let Some((start, end)) = range {
743+
let (start, end) = (start.into(), end.into());
744+
assert!(end >= start, "End time must be after start time");
745+
if let (Some(state), Some(agg)) = (state, aggregate) {
746+
let state = state.materialize(agg.states_as_str());
747+
let mut total = 0;
748+
for tis in agg.combined_durations.iter() {
749+
if tis.state.materialize(agg.states_as_str()) == state {
750+
let tis_start_time = i64::max(tis.start_time, start);
751+
let tis_end_time = i64::min(tis.end_time, end);
752+
if tis_end_time >= start && tis_start_time <= end {
753+
let amount = tis_end_time - tis_start_time;
754+
assert!(amount >= 0, "incorrectly ordered times");
755+
total += amount;
756+
}
757+
}
758+
}
759+
total
760+
} else {
761+
0
762+
}
763+
} else {
764+
state.and_then(|state| aggregate?.get(state)).unwrap_or(0)
765+
};
742766
let interval = pg_sys::Interval {
743767
time,
744768
..Default::default()
@@ -771,6 +795,7 @@ pub fn duration_in<'a>(state: String, aggregate: Option<StateAgg<'a>>) -> crate:
771795
StateEntry::try_from_existing_str(aggregate.states_as_str(), &state)
772796
}),
773797
aggregate,
798+
None,
774799
)
775800
}
776801

@@ -784,7 +809,7 @@ pub fn duration_in_int<'a>(state: i64, aggregate: Option<StateAgg<'a>>) -> crate
784809
if let Some(ref aggregate) = aggregate {
785810
aggregate.assert_int()
786811
};
787-
duration_in_inner(Some(StateEntry::from_integer(state)), aggregate)
812+
duration_in_inner(Some(StateEntry::from_integer(state)), aggregate, None)
788813
}
789814

790815
#[pg_extern(
@@ -819,6 +844,54 @@ pub fn duration_in_tl_int<'a>(
819844
duration_in_inner(
820845
Some(StateEntry::from_integer(state)),
821846
aggregate.map(TimelineAgg::as_state_agg),
847+
None,
848+
)
849+
}
850+
851+
#[pg_extern(
852+
immutable,
853+
parallel_safe,
854+
name = "duration_in",
855+
schema = "toolkit_experimental"
856+
)]
857+
pub fn duration_in_range<'a>(
858+
state: String,
859+
aggregate: Option<TimelineAgg<'a>>,
860+
start: TimestampTz,
861+
end: default!(TimestampTz, "'infinity'"),
862+
) -> crate::raw::Interval {
863+
if let Some(ref aggregate) = aggregate {
864+
aggregate.assert_str()
865+
};
866+
let aggregate = aggregate.map(TimelineAgg::as_state_agg);
867+
duration_in_inner(
868+
aggregate.as_ref().and_then(|aggregate| {
869+
StateEntry::try_from_existing_str(aggregate.states_as_str(), &state)
870+
}),
871+
aggregate,
872+
Some((start, end)),
873+
)
874+
}
875+
876+
#[pg_extern(
877+
immutable,
878+
parallel_safe,
879+
name = "duration_in",
880+
schema = "toolkit_experimental"
881+
)]
882+
pub fn duration_in_range_int<'a>(
883+
state: i64,
884+
aggregate: Option<TimelineAgg<'a>>,
885+
start: TimestampTz,
886+
end: default!(TimestampTz, "'infinity'"),
887+
) -> crate::raw::Interval {
888+
if let Some(ref aggregate) = aggregate {
889+
aggregate.assert_int()
890+
};
891+
duration_in_inner(
892+
Some(StateEntry::from_integer(state)),
893+
aggregate.map(TimelineAgg::as_state_agg),
894+
Some((start, end)),
822895
)
823896
}
824897

@@ -839,7 +912,7 @@ fn interpolated_duration_in_inner<'a>(
839912
let new_agg = aggregate.interpolate(start.into(), interval, prev, next.is_some());
840913
let state_entry =
841914
state.and_then(|state| state.try_existing_entry(new_agg.states_as_str()));
842-
duration_in_inner(state_entry, Some(new_agg))
915+
duration_in_inner(state_entry, Some(new_agg), None)
843916
}
844917
}
845918
}

0 commit comments

Comments
 (0)