Skip to content

Commit 85addbe

Browse files
committed
Stabilizing integral and interpolated integral
1 parent 27d77f6 commit 85addbe

File tree

7 files changed

+219
-49
lines changed

7 files changed

+219
-49
lines changed

Changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo
1313

1414
#### Stabilized features
1515
- [#722](https://github.com/timescale/timescaledb-toolkit/pull/722): Stabilize heartbeat aggregate.
16+
- [#724](https://github.com/timescale/timescaledb-toolkit/pull/724): Stabilize integral and interpolated_integral for time-weighted-average.
1617

1718
#### Other notable changes
1819
- [#716](https://github.com/timescale/timescaledb-toolkit/issues/716): Add arrow operator support for counter aggregate and time-weighted aggregate interpolated accessors.

extension/src/accessors.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -551,32 +551,31 @@ pub fn accessor_unnest() -> AccessorUnnest<'static> {
551551
}
552552
}
553553

554-
#[pg_schema]
555-
pub mod toolkit_experimental {
556-
use super::*;
557-
558-
pg_type! {
559-
#[derive(Debug)]
560-
struct AccessorIntegral<'input> {
561-
len: u32,
562-
bytes: [u8; self.len],
563-
}
554+
pg_type! {
555+
#[derive(Debug)]
556+
struct AccessorIntegral<'input> {
557+
len: u32,
558+
bytes: [u8; self.len],
564559
}
560+
}
565561

566-
// FIXME string IO
567-
ron_inout_funcs!(AccessorIntegral);
562+
// FIXME string IO
563+
ron_inout_funcs!(AccessorIntegral);
568564

569-
#[pg_extern(immutable, parallel_safe, name = "integral")]
570-
pub fn accessor_integral(unit: default!(&str, "'second'")) -> AccessorIntegral<'static> {
571-
unsafe {
572-
flatten! {
573-
AccessorIntegral {
574-
len: unit.len().try_into().unwrap(),
575-
bytes: unit.as_bytes().into(),
576-
}
565+
#[pg_extern(immutable, parallel_safe, name = "integral")]
566+
pub fn accessor_integral(unit: default!(&str, "'second'")) -> AccessorIntegral<'static> {
567+
unsafe {
568+
flatten! {
569+
AccessorIntegral {
570+
len: unit.len().try_into().unwrap(),
571+
bytes: unit.as_bytes().into(),
577572
}
578573
}
579574
}
575+
}
576+
#[pg_schema]
577+
pub mod toolkit_experimental {
578+
use super::*;
580579

581580
pg_type! {
582581
#[derive(Debug)]

extension/src/duration.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
//! PostgreSQL parses duration units. Currently units longer than an hour are unsupported since
33
//! the length of days varies when in a timezone with daylight savings time.
44
5+
use core::fmt::{self, Formatter};
6+
57
// Canonical PostgreSQL units: https://github.com/postgres/postgres/blob/b76fb6c2a99eb7d49f96e56599fef1ffc1c134c9/src/include/utils/datetime.h#L48-L60
68
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
79
pub enum DurationUnit {
@@ -14,7 +16,7 @@ pub enum DurationUnit {
1416
}
1517

1618
impl DurationUnit {
17-
fn microseconds(self) -> u32 {
19+
pub fn microseconds(self) -> u32 {
1820
match self {
1921
Self::Microsec => 1,
2022
Self::Millisec => 1000,
@@ -46,6 +48,18 @@ impl DurationUnit {
4648
}
4749
}
4850

51+
impl fmt::Display for DurationUnit {
52+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
53+
match self {
54+
DurationUnit::Microsec => write!(f, "microsecond"),
55+
DurationUnit::Millisec => write!(f, "millisecond"),
56+
DurationUnit::Second => write!(f, "second"),
57+
DurationUnit::Minute => write!(f, "minute"),
58+
DurationUnit::Hour => write!(f, "hour"),
59+
}
60+
}
61+
}
62+
4963
#[cfg(test)]
5064
mod test {
5165
use super::*;

extension/src/stabilization_info.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ crate::functions_stabilized_at! {
2424
interpolated_rate(timestamp with time zone,interval,countersummary,countersummary),
2525
timeweightinterpolatedaverageaccessor_in(cstring),
2626
timeweightinterpolatedaverageaccessor_out(timeweightinterpolatedaverageaccessor),
27+
accessorintegral_in(cstring),
28+
accessorintegral_out(accessorintegral),
29+
arrow_time_weighted_average_integral(timeweightsummary,accessorintegral),
30+
arrow_time_weighted_average_interpolated_integral(timeweightsummary,timeweightinterpolatedintegralaccessor),
31+
integral(text),
32+
integral(timeweightsummary,text),
33+
interpolated_integral(timestamp with time zone,interval,timeweightsummary,timeweightsummary,text),
34+
interpolated_integral(timeweightsummary,timestamp with time zone, interval,timeweightsummary,timeweightsummary,text),
35+
timeweightinterpolatedintegralaccessor_in(cstring),
36+
timeweightinterpolatedintegralaccessor_out(timeweightinterpolatedintegralaccessor),
2737
dead_ranges(heartbeatagg),
2838
downtime(heartbeatagg),
2939
heartbeat_agg(timestamp with time zone,timestamp with time zone,interval,interval),
@@ -557,6 +567,8 @@ crate::types_stabilized_at! {
557567
counterinterpolateddeltaaccessor,
558568
counterinterpolatedrateaccessor,
559569
timeweightinterpolatedaverageaccessor,
570+
timeweightinterpolatedintegralaccessor,
571+
accessorintegral,
560572
heartbeatagg,
561573
accessordeadranges,
562574
accessordowntime,
@@ -660,6 +672,8 @@ crate::operators_stabilized_at! {
660672
"->"(countersummary,counterinterpolateddeltaaccessor),
661673
"->"(countersummary,counterinterpolatedrateaccessor),
662674
"->"(timeweightsummary,timeweightinterpolatedaverageaccessor),
675+
"->"(timeweightsummary,timeweightinterpolatedintegralaccessor),
676+
"->"(timeweightsummary,accessorintegral),
663677
"->"(heartbeatagg,accessordeadranges),
664678
"->"(heartbeatagg,accessordowntime),
665679
"->"(heartbeatagg,accessorliveat),

extension/src/time_weighted_average.rs

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use serde::{Deserialize, Serialize};
55

66
use crate::{
77
accessors::{
8-
toolkit_experimental, AccessorAverage, AccessorFirstTime, AccessorFirstVal,
9-
AccessorLastTime, AccessorLastVal,
8+
AccessorAverage, AccessorFirstTime, AccessorFirstVal, AccessorIntegral, AccessorLastTime,
9+
AccessorLastVal,
1010
},
1111
aggregate_utils::in_aggregate_context,
1212
duration::DurationUnit,
@@ -25,7 +25,7 @@ use crate::raw::bytea;
2525

2626
mod accessors;
2727

28-
use accessors::TimeWeightInterpolatedAverageAccessor;
28+
use accessors::{TimeWeightInterpolatedAverageAccessor, TimeWeightInterpolatedIntegralAccessor};
2929

3030
pg_type! {
3131
#[derive(Debug)]
@@ -429,7 +429,7 @@ pub fn arrow_time_weighted_average_average<'a>(
429429
#[opname(->)]
430430
pub fn arrow_time_weighted_average_integral<'a>(
431431
tws: Option<TimeWeightSummary<'a>>,
432-
accessor: toolkit_experimental::AccessorIntegral<'a>,
432+
accessor: AccessorIntegral<'a>,
433433
) -> Option<f64> {
434434
time_weighted_average_integral(
435435
tws,
@@ -455,12 +455,7 @@ pub fn time_weighted_average_average<'a>(tws: Option<TimeWeightSummary<'a>>) ->
455455
}
456456
}
457457

458-
#[pg_extern(
459-
immutable,
460-
parallel_safe,
461-
name = "integral",
462-
schema = "toolkit_experimental"
463-
)]
458+
#[pg_extern(immutable, parallel_safe, name = "integral")]
464459
pub fn time_weighted_average_integral<'a>(
465460
tws: Option<TimeWeightSummary<'a>>,
466461
unit: default!(String, "'second'"),
@@ -530,12 +525,7 @@ pub fn arrow_time_weighted_average_interpolated_average<'a>(
530525
)
531526
}
532527

533-
#[pg_extern(
534-
immutable,
535-
parallel_safe,
536-
name = "interpolated_integral",
537-
schema = "toolkit_experimental"
538-
)]
528+
#[pg_extern(immutable, parallel_safe, name = "interpolated_integral")]
539529
pub fn time_weighted_average_interpolated_integral<'a>(
540530
tws: Option<TimeWeightSummary<'a>>,
541531
start: crate::raw::TimestampTz,
@@ -548,6 +538,44 @@ pub fn time_weighted_average_interpolated_integral<'a>(
548538
time_weighted_average_integral(target, unit)
549539
}
550540

541+
#[pg_operator(immutable, parallel_safe)]
542+
#[opname(->)]
543+
pub fn arrow_time_weighted_average_interpolated_integral<'a>(
544+
tws: Option<TimeWeightSummary<'a>>,
545+
accessor: TimeWeightInterpolatedIntegralAccessor<'a>,
546+
) -> Option<f64> {
547+
let prev = if accessor.flags & 1 == 1 {
548+
Some(accessor.prev.clone().into())
549+
} else {
550+
None
551+
};
552+
let next = if accessor.flags & 2 == 2 {
553+
Some(accessor.next.clone().into())
554+
} else {
555+
None
556+
};
557+
558+
// Convert from num of milliseconds to DurationUnit and then to string
559+
let unit = match accessor.unit {
560+
1 => DurationUnit::Microsec,
561+
1000 => DurationUnit::Millisec,
562+
1_000_000 => DurationUnit::Second,
563+
60_000_000 => DurationUnit::Minute,
564+
3_600_000_000 => DurationUnit::Hour,
565+
_ => todo!(), // This should never be reached, the accessor gets these numbers from microseconds() in duration.rs, which only matches on valid enum values
566+
}
567+
.to_string();
568+
569+
time_weighted_average_interpolated_integral(
570+
tws,
571+
accessor.start.into(),
572+
accessor.interval.into(),
573+
prev,
574+
next,
575+
unit,
576+
)
577+
}
578+
551579
#[cfg(any(test, feature = "pg_test"))]
552580
#[pg_schema]
553581
mod tests {
@@ -576,9 +604,9 @@ mod tests {
576604
let stmt = "INSERT INTO test VALUES('2020-01-01 00:00:00+00', 10.0)";
577605
client.update(stmt, None, None).unwrap();
578606

579-
let stmt = "SELECT toolkit_experimental.integral(time_weight('Trapezoidal', ts, val), 'hrs') FROM test";
607+
let stmt = "SELECT integral(time_weight('Trapezoidal', ts, val), 'hrs') FROM test";
580608
assert_eq!(select_one!(client, stmt, f64), 0.0);
581-
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'msecond') FROM test";
609+
let stmt = "SELECT integral(time_weight('LOCF', ts, val), 'msecond') FROM test";
582610
assert_eq!(select_one!(client, stmt, f64), 0.0);
583611

584612
// add another point
@@ -622,9 +650,9 @@ mod tests {
622650
let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test";
623651
assert!((select_one!(client, stmt, f64) - 15.0).abs() < f64::EPSILON);
624652

625-
let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'mins') FROM test";
653+
let stmt = "SELECT integral(time_weight('Linear', ts, val), 'mins') FROM test";
626654
assert!((select_one!(client, stmt, f64) - 60.0).abs() < f64::EPSILON);
627-
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'hour') FROM test";
655+
let stmt = "SELECT integral(time_weight('LOCF', ts, val), 'hour') FROM test";
628656
assert!((select_one!(client, stmt, f64) - 1.0).abs() < f64::EPSILON);
629657

630658
//non-evenly spaced values
@@ -640,23 +668,23 @@ mod tests {
640668
// arrow syntax should be the same
641669
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);
642670

643-
let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'microseconds') FROM test";
671+
let stmt = "SELECT integral(time_weight('Linear', ts, val), 'microseconds') FROM test";
644672
assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON);
645673
let stmt = "SELECT time_weight('Linear', ts, val) \
646-
->toolkit_experimental.integral('microseconds') \
674+
->integral('microseconds') \
647675
FROM test";
648676
// arrow syntax should be the same
649677
assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON);
650678
let stmt = "SELECT time_weight('Linear', ts, val) \
651-
->toolkit_experimental.integral() \
679+
->integral() \
652680
FROM test";
653681
assert!((select_one!(client, stmt, f64) - 25500.00).abs() < f64::EPSILON);
654682

655683
let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test";
656684
// expected = (10 + 20 + 10 + 20 + 10*4 + 30*2 +10*.5 + 20*9.5) / 20 = 17.75 using last value and carrying for each point
657685
assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON);
658686

659-
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'milliseconds') FROM test";
687+
let stmt = "SELECT integral(time_weight('LOCF', ts, val), 'milliseconds') FROM test";
660688
assert!((select_one!(client, stmt, f64) - 21300000.0).abs() < f64::EPSILON);
661689

662690
//make sure this works with whatever ordering we throw at it
@@ -665,9 +693,9 @@ mod tests {
665693
let stmt = "SELECT average(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
666694
assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON);
667695

668-
let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val ORDER BY random()), 'seconds') FROM test";
696+
let stmt = "SELECT integral(time_weight('Linear', ts, val ORDER BY random()), 'seconds') FROM test";
669697
assert!((select_one!(client, stmt, f64) - 25500.0).abs() < f64::EPSILON);
670-
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
698+
let stmt = "SELECT integral(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
671699
assert!((select_one!(client, stmt, f64) - 21300.0).abs() < f64::EPSILON);
672700

673701
// make sure we get the same result if we do multi-level aggregation
@@ -898,7 +926,7 @@ mod tests {
898926
let mut integrals = client
899927
.update(
900928
r#"SELECT
901-
toolkit_experimental.interpolated_integral(
929+
interpolated_integral(
902930
agg,
903931
bucket,
904932
'1 day'::interval,
@@ -919,7 +947,7 @@ mod tests {
919947
client
920948
.update(
921949
r#"SELECT
922-
toolkit_experimental.interpolated_integral(
950+
interpolated_integral(
923951
agg,
924952
bucket,
925953
'1 day'::interval,

extension/src/time_weighted_average/accessors.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use pgx::*;
22

3+
use crate::time_weighted_average::DurationUnit;
34
use crate::{
45
datum_utils::interval_to_ms,
56
flatten, pg_type, ron_inout_funcs,
@@ -54,3 +55,61 @@ fn time_weight_interpolated_average_accessor<'a>(
5455
}
5556
}
5657
}
58+
59+
pg_type! {
60+
#[derive(Debug)]
61+
struct TimeWeightInterpolatedIntegralAccessor {
62+
start : i64,
63+
interval : i64,
64+
prev : TimeWeightSummaryData,
65+
pad : [u8;3],
66+
unit : u32,
67+
flags: u64,
68+
next : TimeWeightSummaryData,
69+
}
70+
}
71+
72+
ron_inout_funcs!(TimeWeightInterpolatedIntegralAccessor);
73+
74+
#[pg_extern(immutable, parallel_safe, name = "interpolated_integral")]
75+
fn time_weight_interpolated_integral_accessor<'a>(
76+
start: crate::raw::TimestampTz,
77+
interval: crate::raw::Interval,
78+
prev: Option<TimeWeightSummary<'a>>,
79+
next: Option<TimeWeightSummary<'a>>,
80+
unit: String,
81+
) -> TimeWeightInterpolatedIntegralAccessor<'static> {
82+
fn empty_summary<'b>() -> Option<TimeWeightSummary<'b>> {
83+
Some(unsafe {
84+
flatten!(TimeWeightSummary {
85+
first: TSPoint { ts: 0, val: 0.0 },
86+
last: TSPoint { ts: 0, val: 0.0 },
87+
weighted_sum: 0.0,
88+
method: TimeWeightMethod::LOCF,
89+
})
90+
})
91+
}
92+
93+
let unit = match DurationUnit::from_str(&unit) {
94+
Some(unit) => unit.microseconds(),
95+
None => pgx::error!(
96+
"Unrecognized duration unit: {}. Valid units are: usecond, msecond, second, minute, hour",
97+
unit,
98+
),
99+
};
100+
let flags = u64::from(prev.is_some()) + if next.is_some() { 2 } else { 0 };
101+
let prev = prev.or_else(empty_summary).unwrap().0;
102+
let next = next.or_else(empty_summary).unwrap().0;
103+
let interval = interval_to_ms(&start, &interval);
104+
crate::build! {
105+
TimeWeightInterpolatedIntegralAccessor {
106+
start: start.into(),
107+
interval,
108+
prev,
109+
pad : [0,0,0],
110+
unit,
111+
flags,
112+
next,
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)