Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo
- [#741](https://github.com/timescale/timescaledb-toolkit/pull/741): Stabilize `approx_count_distinct`
- [#748](https://github.com/timescale/timescaledb-toolkit/pull/748): Stabilize `approx_percentile_array`
- [#745](https://github.com/timescale/timescaledb-toolkit/pull/745): Stabilize date utility functions
- [#751](https://github.com/timescale/timescaledb-toolkit/pull/751): Stabilize `min_n`/`max_n`/`min_n_by`/`max_n_by`

#### Other notable changes
- [#743](https://github.com/timescale/timescaledb-toolkit/pull/743): Remove support for direct upgrades from toolkit versions more than 1 year old. Toolkit versions 1.4.x and 1.5.x will have to upgrade to an intermediate version before upgrading to 1.16.0.
Expand Down
1 change: 1 addition & 0 deletions extension/src/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ accessor! { dead_ranges() }
accessor! { uptime() }
accessor! { downtime() }
accessor! { into_values() }
accessor! { into_array() }
accessor! { into_int_values() }
accessor! { state_timeline() }
accessor! { state_int_timeline() }
Expand Down
103 changes: 48 additions & 55 deletions extension/src/nmost/max_by_float.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use pgx::{iter::TableIterator, *};

use crate::nmost::max_float::toolkit_experimental::*;
use crate::nmost::max_float::*;
use crate::nmost::*;

use crate::{
Expand All @@ -14,42 +14,37 @@ use std::cmp::Reverse;

type MaxByFloatTransType = NMostByTransState<Reverse<NotNan<f64>>>;

#[pg_schema]
pub mod toolkit_experimental {
use super::*;

pg_type! {
#[derive(Debug)]
struct MaxByFloats<'input> {
values: MaxFloatsData<'input>, // Nesting pg_types adds 8 bytes of header
data: DatumStore<'input>,
}
pg_type! {
#[derive(Debug)]
struct MaxByFloats<'input> {
values: MaxFloatsData<'input>, // Nesting pg_types adds 8 bytes of header
data: DatumStore<'input>,
}
ron_inout_funcs!(MaxByFloats);

impl<'input> From<MaxByFloatTransType> for MaxByFloats<'input> {
fn from(item: MaxByFloatTransType) -> Self {
let (capacity, val_ary, data) = item.into_sorted_parts();
unsafe {
flatten!(MaxByFloats {
values: build!(MaxFloats {
capacity: capacity as u32,
elements: val_ary.len() as u32,
values: val_ary
.into_iter()
.map(|x| f64::from(x.0))
.collect::<Vec<f64>>()
.into()
})
.0,
data,
}
ron_inout_funcs!(MaxByFloats);

impl<'input> From<MaxByFloatTransType> for MaxByFloats<'input> {
fn from(item: MaxByFloatTransType) -> Self {
let (capacity, val_ary, data) = item.into_sorted_parts();
unsafe {
flatten!(MaxByFloats {
values: build!(MaxFloats {
capacity: capacity as u32,
elements: val_ary.len() as u32,
values: val_ary
.into_iter()
.map(|x| f64::from(x.0))
.collect::<Vec<f64>>()
.into()
})
}
.0,
data,
})
}
}
}

#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)]
#[pg_extern(immutable, parallel_safe)]
pub fn max_n_by_float_trans(
state: Internal,
value: f64,
Expand All @@ -67,10 +62,10 @@ pub fn max_n_by_float_trans(
.internal()
}

#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)]
#[pg_extern(immutable, parallel_safe)]
pub fn max_n_by_float_rollup_trans(
state: Internal,
value: toolkit_experimental::MaxByFloats<'static>,
value: MaxByFloats<'static>,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
let values: Vec<Reverse<NotNan<f64>>> = value
Expand All @@ -90,19 +85,14 @@ pub fn max_n_by_float_rollup_trans(
.internal()
}

#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)]
pub fn max_n_by_float_final(state: Internal) -> toolkit_experimental::MaxByFloats<'static> {
#[pg_extern(immutable, parallel_safe)]
pub fn max_n_by_float_final(state: Internal) -> MaxByFloats<'static> {
unsafe { state.to_inner::<MaxByFloatTransType>().unwrap().clone() }.into()
}

#[pg_extern(
schema = "toolkit_experimental",
name = "into_values",
immutable,
parallel_safe
)]
#[pg_extern(name = "into_values", immutable, parallel_safe)]
pub fn max_n_by_float_to_values(
agg: toolkit_experimental::MaxByFloats<'static>,
agg: MaxByFloats<'static>,
_dummy: Option<AnyElement>,
) -> TableIterator<'static, (name!(value, f64), name!(data, AnyElement))> {
TableIterator::new(
Expand All @@ -116,12 +106,12 @@ pub fn max_n_by_float_to_values(

extension_sql!(
"\n\
CREATE AGGREGATE toolkit_experimental.max_n_by(\n\
CREATE AGGREGATE max_n_by(\n\
value double precision, data AnyElement, capacity bigint\n\
) (\n\
sfunc = toolkit_experimental.max_n_by_float_trans,\n\
sfunc = max_n_by_float_trans,\n\
stype = internal,\n\
finalfunc = toolkit_experimental.max_n_by_float_final\n\
finalfunc = max_n_by_float_final\n\
);\n\
",
name = "max_n_by_float",
Expand All @@ -130,12 +120,12 @@ extension_sql!(

extension_sql!(
"\n\
CREATE AGGREGATE toolkit_experimental.rollup(\n\
toolkit_experimental.MaxByFloats\n\
CREATE AGGREGATE rollup(\n\
MaxByFloats\n\
) (\n\
sfunc = toolkit_experimental.max_n_by_float_rollup_trans,\n\
sfunc = max_n_by_float_rollup_trans,\n\
stype = internal,\n\
finalfunc = toolkit_experimental.max_n_by_float_final\n\
finalfunc = max_n_by_float_final\n\
);\n\
",
name = "max_n_by_float_rollup",
Expand Down Expand Up @@ -173,10 +163,13 @@ mod tests {
}

// Test into_values
let mut result =
client.update("SELECT toolkit_experimental.into_values(toolkit_experimental.max_n_by(val, data, 3), NULL::data)::TEXT from data",
None, None,
).unwrap();
let mut result = client
.update(
"SELECT into_values(max_n_by(val, data, 3), NULL::data)::TEXT from data",
None,
None,
)
.unwrap();
assert_eq!(
result.next().unwrap()[1].value().unwrap(),
Some("(0.7734375,\"(0.7734375,3)\")")
Expand All @@ -194,8 +187,8 @@ mod tests {
// Test rollup
let mut result =
client.update(
"WITH aggs as (SELECT category, toolkit_experimental.max_n_by(val, data, 5) as agg from data GROUP BY category)
SELECT toolkit_experimental.into_values(toolkit_experimental.rollup(agg), NULL::data)::TEXT FROM aggs",
"WITH aggs as (SELECT category, max_n_by(val, data, 5) as agg from data GROUP BY category)
SELECT into_values(rollup(agg), NULL::data)::TEXT FROM aggs",
None, None,
).unwrap();
assert_eq!(
Expand Down
103 changes: 48 additions & 55 deletions extension/src/nmost/max_by_int.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use pgx::{iter::TableIterator, *};

use crate::nmost::max_int::toolkit_experimental::*;
use crate::nmost::max_int::*;
use crate::nmost::*;

use crate::{
Expand All @@ -13,42 +13,37 @@ use std::cmp::Reverse;

type MaxByIntTransType = NMostByTransState<Reverse<i64>>;

#[pg_schema]
pub mod toolkit_experimental {
use super::*;

pg_type! {
#[derive(Debug)]
struct MaxByInts<'input> {
values: MaxIntsData<'input>, // Nesting pg_types adds 8 bytes of header
data: DatumStore<'input>,
}
pg_type! {
#[derive(Debug)]
struct MaxByInts<'input> {
values: MaxIntsData<'input>, // Nesting pg_types adds 8 bytes of header
data: DatumStore<'input>,
}
ron_inout_funcs!(MaxByInts);

impl<'input> From<MaxByIntTransType> for MaxByInts<'input> {
fn from(item: MaxByIntTransType) -> Self {
let (capacity, val_ary, data) = item.into_sorted_parts();
unsafe {
flatten!(MaxByInts {
values: build!(MaxInts {
capacity: capacity as u32,
elements: val_ary.len() as u32,
values: val_ary
.into_iter()
.map(|x| x.0)
.collect::<Vec<i64>>()
.into()
})
.0,
data,
}
ron_inout_funcs!(MaxByInts);

impl<'input> From<MaxByIntTransType> for MaxByInts<'input> {
fn from(item: MaxByIntTransType) -> Self {
let (capacity, val_ary, data) = item.into_sorted_parts();
unsafe {
flatten!(MaxByInts {
values: build!(MaxInts {
capacity: capacity as u32,
elements: val_ary.len() as u32,
values: val_ary
.into_iter()
.map(|x| x.0)
.collect::<Vec<i64>>()
.into()
})
}
.0,
data,
})
}
}
}

#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)]
#[pg_extern(immutable, parallel_safe)]
pub fn max_n_by_int_trans(
state: Internal,
value: i64,
Expand All @@ -66,10 +61,10 @@ pub fn max_n_by_int_trans(
.internal()
}

#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)]
#[pg_extern(immutable, parallel_safe)]
pub fn max_n_by_int_rollup_trans(
state: Internal,
value: toolkit_experimental::MaxByInts<'static>,
value: MaxByInts<'static>,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
let values: Vec<Reverse<i64>> = value
Expand All @@ -89,19 +84,14 @@ pub fn max_n_by_int_rollup_trans(
.internal()
}

#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)]
pub fn max_n_by_int_final(state: Internal) -> toolkit_experimental::MaxByInts<'static> {
#[pg_extern(immutable, parallel_safe)]
pub fn max_n_by_int_final(state: Internal) -> MaxByInts<'static> {
unsafe { state.to_inner::<MaxByIntTransType>().unwrap().clone() }.into()
}

#[pg_extern(
schema = "toolkit_experimental",
name = "into_values",
immutable,
parallel_safe
)]
#[pg_extern(name = "into_values", immutable, parallel_safe)]
pub fn max_n_by_int_to_values(
agg: toolkit_experimental::MaxByInts<'static>,
agg: MaxByInts<'static>,
_dummy: Option<AnyElement>,
) -> TableIterator<'static, (name!(value, i64), name!(data, AnyElement))> {
TableIterator::new(
Expand All @@ -115,12 +105,12 @@ pub fn max_n_by_int_to_values(

extension_sql!(
"\n\
CREATE AGGREGATE toolkit_experimental.max_n_by(\n\
CREATE AGGREGATE max_n_by(\n\
value bigint, data AnyElement, capacity bigint\n\
) (\n\
sfunc = toolkit_experimental.max_n_by_int_trans,\n\
sfunc = max_n_by_int_trans,\n\
stype = internal,\n\
finalfunc = toolkit_experimental.max_n_by_int_final\n\
finalfunc = max_n_by_int_final\n\
);\n\
",
name = "max_n_by_int",
Expand All @@ -129,12 +119,12 @@ extension_sql!(

extension_sql!(
"\n\
CREATE AGGREGATE toolkit_experimental.rollup(\n\
toolkit_experimental.MaxByInts\n\
CREATE AGGREGATE rollup(\n\
MaxByInts\n\
) (\n\
sfunc = toolkit_experimental.max_n_by_int_rollup_trans,\n\
sfunc = max_n_by_int_rollup_trans,\n\
stype = internal,\n\
finalfunc = toolkit_experimental.max_n_by_int_final\n\
finalfunc = max_n_by_int_final\n\
);\n\
",
name = "max_n_by_int_rollup",
Expand Down Expand Up @@ -168,10 +158,13 @@ mod tests {
}

// Test into_values
let mut result =
client.update("SELECT toolkit_experimental.into_values(toolkit_experimental.max_n_by(val, data, 3), NULL::data)::TEXT from data",
None, None,
).unwrap();
let mut result = client
.update(
"SELECT into_values(max_n_by(val, data, 3), NULL::data)::TEXT from data",
None,
None,
)
.unwrap();
assert_eq!(
result.next().unwrap()[1].value().unwrap(),
Some("(99,\"(99,3)\")")
Expand All @@ -189,8 +182,8 @@ mod tests {
// Test rollup
let mut result =
client.update(
"WITH aggs as (SELECT category, toolkit_experimental.max_n_by(val, data, 5) as agg from data GROUP BY category)
SELECT toolkit_experimental.into_values(toolkit_experimental.rollup(agg), NULL::data)::TEXT FROM aggs",
"WITH aggs as (SELECT category, max_n_by(val, data, 5) as agg from data GROUP BY category)
SELECT into_values(rollup(agg), NULL::data)::TEXT FROM aggs",
None, None,
).unwrap();
assert_eq!(
Expand Down
Loading