Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ This changelog should be updated as part of a PR if the work is worth noting (mo
#### New experimental features

#### Bug fixes
- [#624](https://github.com/timescale/timescaledb-toolkit/pull/624): Remove partial aggregation for Candlestick aggregates.
We've determined that the cause for the bad results lives somewhere in the functions that are used to support partial aggregation.
We can at least prevent folks from running the candlestick aggregates in parallel mode and hitting this bug by dropping support for partial aggregation until we've resolved the issue.

#### Other notable changes

Expand Down
92 changes: 7 additions & 85 deletions extension/src/ohlc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::{
aggregate_utils::in_aggregate_context,
flatten,
palloc::{Inner, Internal, InternalAsValue, ToInternal},
pg_type,
raw::bytea,
ron_inout_funcs,
pg_type, ron_inout_funcs,
};
use tspoint::TSPoint;

Expand Down Expand Up @@ -313,72 +311,16 @@ pub fn candlestick_final_inner(
}
}

#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")]
pub fn candlestick_combine(
state1: Internal,
state2: Internal,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
unsafe { candlestick_combine_inner(state1.to_inner(), state2.to_inner(), fcinfo).internal() }
}

pub fn candlestick_combine_inner<'input>(
state1: Option<Inner<Candlestick<'input>>>,
state2: Option<Inner<Candlestick<'input>>>,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Inner<Candlestick<'input>>> {
unsafe {
in_aggregate_context(fcinfo, || match (state1, state2) {
(None, None) => None,
(None, Some(only)) | (Some(only), None) => Some(only),
(Some(a), Some(b)) => {
let (mut a, b) = (*a, *b);
a.combine(&b);
Some(a.into())
}
})
}
}

#[pg_extern(immutable, parallel_safe, strict, schema = "toolkit_experimental")]
pub fn candlestick_serialize(state: Internal) -> bytea {
let cs: &mut Candlestick = unsafe { state.get_mut().unwrap() };
let ser = &**cs;
crate::do_serialize!(ser)
}

#[pg_extern(immutable, parallel_safe, strict, schema = "toolkit_experimental")]
pub fn candlestick_deserialize(bytes: bytea, _internal: Internal) -> Option<Internal> {
candlestick_deserialize_inner(bytes).internal()
}

pub fn candlestick_deserialize_inner(bytes: bytea) -> Inner<Candlestick<'static>> {
use crate::ohlc::toolkit_experimental::CandlestickData;
let de: CandlestickData = crate::do_deserialize!(bytes, CandlestickData);
let cs: Candlestick = de.into();
cs.into()
}

extension_sql!(
"\n\
CREATE AGGREGATE toolkit_experimental.ohlc( ts timestamptz, price DOUBLE PRECISION )\n\
(\n\
sfunc = toolkit_experimental.tick_data_no_vol_transition,\n\
stype = internal,\n\
finalfunc = toolkit_experimental.candlestick_final,\n\
combinefunc = toolkit_experimental.candlestick_combine,\n\
serialfunc = toolkit_experimental.candlestick_serialize,\n\
deserialfunc = toolkit_experimental.candlestick_deserialize,\n\
parallel = safe\n\
finalfunc = toolkit_experimental.candlestick_final\n\
);\n",
name = "ohlc",
requires = [
tick_data_no_vol_transition,
candlestick_final,
candlestick_combine,
candlestick_serialize,
candlestick_deserialize
],
requires = [tick_data_no_vol_transition, candlestick_final,],
);

extension_sql!(
Expand All @@ -391,20 +333,10 @@ extension_sql!(
(\n\
sfunc = toolkit_experimental.tick_data_transition,\n\
stype = internal,\n\
finalfunc = toolkit_experimental.candlestick_final,\n\
combinefunc = toolkit_experimental.candlestick_combine,\n\
serialfunc = toolkit_experimental.candlestick_serialize,\n\
deserialfunc = toolkit_experimental.candlestick_deserialize,\n\
parallel = safe\n\
finalfunc = toolkit_experimental.candlestick_final\n\
);\n",
name = "candlestick_agg",
requires = [
tick_data_transition,
candlestick_final,
candlestick_combine,
candlestick_serialize,
candlestick_deserialize
],
requires = [tick_data_transition, candlestick_final,],
);

extension_sql!(
Expand All @@ -413,20 +345,10 @@ extension_sql!(
(\n\
sfunc = toolkit_experimental.candlestick_rollup_trans,\n\
stype = internal,\n\
finalfunc = toolkit_experimental.candlestick_final,\n\
combinefunc = toolkit_experimental.candlestick_combine,\n\
serialfunc = toolkit_experimental.candlestick_serialize,\n\
deserialfunc = toolkit_experimental.candlestick_deserialize,\n\
parallel = safe\n\
finalfunc = toolkit_experimental.candlestick_final\n\
);\n",
name = "ohlc_rollup",
requires = [
candlestick_rollup_trans,
candlestick_final,
candlestick_combine,
candlestick_serialize,
candlestick_deserialize
],
requires = [candlestick_rollup_trans, candlestick_final,],
);

#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")]
Expand Down