Skip to content

Commit 537a4a0

Browse files
bors[bot]rtwalker
andauthored
Merge #624
624: Remove partial aggregation for Candlestick aggregates r=rtwalker a=rtwalker Putting this here as an option for putting out a patch release until we have a proper fix for #611. We've determined that the cause for the bad results lives somewhere in the functions that are used to support partial aggregation. We can at least prevent folks from running the candlestick aggregates in parallel mode and hitting this bug by dropping support for partial aggregation until we've resolved the issue. Co-authored-by: Ryan Walker <[email protected]>
2 parents bd9b1d9 + 408b498 commit 537a4a0

File tree

2 files changed

+10
-85
lines changed

2 files changed

+10
-85
lines changed

Changelog.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ This changelog should be updated as part of a PR if the work is worth noting (mo
99
#### New experimental features
1010

1111
#### Bug fixes
12+
- [#624](https://github.com/timescale/timescaledb-toolkit/pull/624): Remove partial aggregation for Candlestick aggregates.
13+
We've determined that the cause for the bad results lives somewhere in the functions that are used to support partial aggregation.
14+
We can at least prevent folks from running the candlestick aggregates in parallel mode and hitting this bug by dropping support for partial aggregation until we've resolved the issue.
1215

1316
#### Other notable changes
1417

extension/src/ohlc.rs

Lines changed: 7 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ use crate::{
55
aggregate_utils::in_aggregate_context,
66
flatten,
77
palloc::{Inner, Internal, InternalAsValue, ToInternal},
8-
pg_type,
9-
raw::bytea,
10-
ron_inout_funcs,
8+
pg_type, ron_inout_funcs,
119
};
1210
use tspoint::TSPoint;
1311

@@ -313,72 +311,16 @@ pub fn candlestick_final_inner(
313311
}
314312
}
315313

316-
#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")]
317-
pub fn candlestick_combine(
318-
state1: Internal,
319-
state2: Internal,
320-
fcinfo: pg_sys::FunctionCallInfo,
321-
) -> Option<Internal> {
322-
unsafe { candlestick_combine_inner(state1.to_inner(), state2.to_inner(), fcinfo).internal() }
323-
}
324-
325-
pub fn candlestick_combine_inner<'input>(
326-
state1: Option<Inner<Candlestick<'input>>>,
327-
state2: Option<Inner<Candlestick<'input>>>,
328-
fcinfo: pg_sys::FunctionCallInfo,
329-
) -> Option<Inner<Candlestick<'input>>> {
330-
unsafe {
331-
in_aggregate_context(fcinfo, || match (state1, state2) {
332-
(None, None) => None,
333-
(None, Some(only)) | (Some(only), None) => Some(only),
334-
(Some(a), Some(b)) => {
335-
let (mut a, b) = (*a, *b);
336-
a.combine(&b);
337-
Some(a.into())
338-
}
339-
})
340-
}
341-
}
342-
343-
#[pg_extern(immutable, parallel_safe, strict, schema = "toolkit_experimental")]
344-
pub fn candlestick_serialize(state: Internal) -> bytea {
345-
let cs: &mut Candlestick = unsafe { state.get_mut().unwrap() };
346-
let ser = &**cs;
347-
crate::do_serialize!(ser)
348-
}
349-
350-
#[pg_extern(immutable, parallel_safe, strict, schema = "toolkit_experimental")]
351-
pub fn candlestick_deserialize(bytes: bytea, _internal: Internal) -> Option<Internal> {
352-
candlestick_deserialize_inner(bytes).internal()
353-
}
354-
355-
pub fn candlestick_deserialize_inner(bytes: bytea) -> Inner<Candlestick<'static>> {
356-
use crate::ohlc::toolkit_experimental::CandlestickData;
357-
let de: CandlestickData = crate::do_deserialize!(bytes, CandlestickData);
358-
let cs: Candlestick = de.into();
359-
cs.into()
360-
}
361-
362314
extension_sql!(
363315
"\n\
364316
CREATE AGGREGATE toolkit_experimental.ohlc( ts timestamptz, price DOUBLE PRECISION )\n\
365317
(\n\
366318
sfunc = toolkit_experimental.tick_data_no_vol_transition,\n\
367319
stype = internal,\n\
368-
finalfunc = toolkit_experimental.candlestick_final,\n\
369-
combinefunc = toolkit_experimental.candlestick_combine,\n\
370-
serialfunc = toolkit_experimental.candlestick_serialize,\n\
371-
deserialfunc = toolkit_experimental.candlestick_deserialize,\n\
372-
parallel = safe\n\
320+
finalfunc = toolkit_experimental.candlestick_final\n\
373321
);\n",
374322
name = "ohlc",
375-
requires = [
376-
tick_data_no_vol_transition,
377-
candlestick_final,
378-
candlestick_combine,
379-
candlestick_serialize,
380-
candlestick_deserialize
381-
],
323+
requires = [tick_data_no_vol_transition, candlestick_final,],
382324
);
383325

384326
extension_sql!(
@@ -391,20 +333,10 @@ extension_sql!(
391333
(\n\
392334
sfunc = toolkit_experimental.tick_data_transition,\n\
393335
stype = internal,\n\
394-
finalfunc = toolkit_experimental.candlestick_final,\n\
395-
combinefunc = toolkit_experimental.candlestick_combine,\n\
396-
serialfunc = toolkit_experimental.candlestick_serialize,\n\
397-
deserialfunc = toolkit_experimental.candlestick_deserialize,\n\
398-
parallel = safe\n\
336+
finalfunc = toolkit_experimental.candlestick_final\n\
399337
);\n",
400338
name = "candlestick_agg",
401-
requires = [
402-
tick_data_transition,
403-
candlestick_final,
404-
candlestick_combine,
405-
candlestick_serialize,
406-
candlestick_deserialize
407-
],
339+
requires = [tick_data_transition, candlestick_final,],
408340
);
409341

410342
extension_sql!(
@@ -413,20 +345,10 @@ extension_sql!(
413345
(\n\
414346
sfunc = toolkit_experimental.candlestick_rollup_trans,\n\
415347
stype = internal,\n\
416-
finalfunc = toolkit_experimental.candlestick_final,\n\
417-
combinefunc = toolkit_experimental.candlestick_combine,\n\
418-
serialfunc = toolkit_experimental.candlestick_serialize,\n\
419-
deserialfunc = toolkit_experimental.candlestick_deserialize,\n\
420-
parallel = safe\n\
348+
finalfunc = toolkit_experimental.candlestick_final\n\
421349
);\n",
422350
name = "ohlc_rollup",
423-
requires = [
424-
candlestick_rollup_trans,
425-
candlestick_final,
426-
candlestick_combine,
427-
candlestick_serialize,
428-
candlestick_deserialize
429-
],
351+
requires = [candlestick_rollup_trans, candlestick_final,],
430352
);
431353

432354
#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")]

0 commit comments

Comments
 (0)