Skip to content

Commit 98cb978

Browse files
committed
Add experimental library for client-side use, with examples.
For issue #485.
1 parent 0fdc32b commit 98cb978

File tree

10 files changed

+514
-94
lines changed

10 files changed

+514
-94
lines changed

Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,11 @@
22
resolver = "2"
33

44
members = [
5+
"crates/t-digest-lib",
56
"extension",
6-
"crates/encodings",
7-
"crates/flat_serialize/flat_serialize_macro",
8-
"crates/flat_serialize/flat_serialize",
9-
"crates/t-digest",
10-
"crates/hyperloglogplusplus",
11-
"crates/udd-sketch",
12-
"crates/time-weighted-average",
137
"tools/post-install",
148
"tools/sql-doctester",
159
"tools/update-tester",
16-
"crates/asap",
17-
"crates/counter-agg",
18-
"crates/tspoint",
19-
"crates/stats-agg",
20-
"crates/aggregate_builder",
21-
"crates/scripting-utilities/*",
22-
"crates/count-min-sketch",
2310
]
2411

2512
[profile.release]

crates/t-digest-lib/Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "tdigest-lib"
3+
version = "0.0.0"
4+
5+
[lib]
6+
name = "timescaledb_toolkit_tdigest"
7+
crate-type = ["cdylib", "staticlib"]
8+
9+
[dependencies]
10+
libc = "0.2.135"
11+
12+
tdigest = { path="../t-digest" }

crates/t-digest-lib/src/lib.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#[no_mangle]
2+
pub extern "C" fn timescaledb_toolkit_tdigest_builder_with_size(
3+
size: usize,
4+
) -> Box<tdigest::Builder> {
5+
Box::new(tdigest::Builder::with_size(size))
6+
}
7+
8+
#[no_mangle]
9+
pub unsafe extern "C" fn timescaledb_toolkit_tdigest_push(
10+
builder: *mut tdigest::Builder,
11+
value: f64,
12+
) {
13+
(*builder).push(value)
14+
}
15+
16+
#[no_mangle]
17+
pub unsafe extern "C" fn timescaledb_toolkit_tdigest_merge(
18+
builder: *mut tdigest::Builder,
19+
other: Box<tdigest::Builder>,
20+
) {
21+
let other = *other;
22+
(*builder).merge(other)
23+
}
24+
25+
#[no_mangle]
26+
pub extern "C" fn timescaledb_toolkit_tdigest_builder_free(_: Box<tdigest::Builder>) {}
27+
28+
#[no_mangle]
29+
pub extern "C" fn timescaledb_toolkit_tdigest_build(
30+
mut builder: Box<tdigest::Builder>,
31+
) -> Box<tdigest::TDigest> {
32+
Box::new(builder.build())
33+
}
34+
35+
#[no_mangle]
36+
pub extern "C" fn timescaledb_toolkit_tdigest_free(_: Box<tdigest::TDigest>) {}
37+
38+
// TODO Messy, but good enough to experiment with. We might want to
39+
// into_raw_parts the String and offer a transparent struct containing pointer
40+
// to and size of the buffer, with a ts_tk_tdigest_string_free taking it back
41+
// and releasing it. That also avoids one copy.
42+
#[no_mangle]
43+
pub unsafe extern "C" fn timescaledb_toolkit_tdigest_format_for_postgres(
44+
td: *const tdigest::TDigest,
45+
) -> *mut libc::c_char {
46+
let s = (*td).format_for_postgres();
47+
let buf = libc::malloc(s.len() + 1);
48+
libc::memcpy(buf, s.as_ptr() as *const libc::c_void, s.len());
49+
let buf = buf as *mut libc::c_char;
50+
let r = std::slice::from_raw_parts_mut(buf, s.len() + 1);
51+
r[s.len()] = 0;
52+
buf as *mut libc::c_char
53+
}

crates/t-digest/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ edition = "2021"
88
flat_serialize = {path="../flat_serialize/flat_serialize"}
99
flat_serialize_macro = {path="../flat_serialize/flat_serialize_macro"}
1010
ordered-float = {version = "1.0", features = ["serde"] }
11+
ron = "0.6.0"
1112
serde = { version = "1.0", features = ["derive"] }
1213

1314
[dev-dependencies]

crates/t-digest/src/lib.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,35 @@ impl TDigest {
205205
pub fn num_buckets(&self) -> usize {
206206
self.centroids.len()
207207
}
208+
209+
pub fn format_for_postgres(&self) -> String {
210+
/// Mimicks the version-1 serialization format the extension uses. TODO don't!
211+
#[derive(Serialize)]
212+
struct Hack {
213+
version: u32,
214+
buckets: usize,
215+
max_buckets: usize,
216+
count: u64,
217+
sum: f64,
218+
min: f64,
219+
max: f64,
220+
centroids: Vec<Centroid>,
221+
}
222+
223+
let max_buckets = self.max_size();
224+
let centroids = self.raw_centroids();
225+
ron::to_string(&Hack {
226+
version: 1,
227+
max_buckets,
228+
buckets: centroids.len(),
229+
count: self.count(),
230+
sum: self.sum(),
231+
min: self.min(),
232+
max: self.max(),
233+
centroids: centroids.to_vec(),
234+
})
235+
.unwrap()
236+
}
208237
}
209238

210239
impl Default for TDigest {
@@ -649,6 +678,65 @@ impl TDigest {
649678
}
650679
}
651680

681+
// This is a tdigest object paired
682+
// with a vector of values that still need to be inserted.
683+
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
684+
pub struct Builder {
685+
#[serde(skip)]
686+
buffer: Vec<f64>,
687+
digested: TDigest,
688+
}
689+
690+
impl From<TDigest> for Builder {
691+
fn from(digested: TDigest) -> Self {
692+
Self {
693+
digested,
694+
..Default::default()
695+
}
696+
}
697+
}
698+
699+
impl Builder {
700+
pub fn with_size(size: usize) -> Self {
701+
Self::from(TDigest::new_with_size(size))
702+
}
703+
704+
// Add a new value, recalculate the digest if we've crossed a threshold.
705+
// TODO threshold is currently set to number of digest buckets, should this be adjusted
706+
pub fn push(&mut self, value: f64) {
707+
self.buffer.push(value);
708+
if self.buffer.len() >= self.digested.max_size() {
709+
self.digest()
710+
}
711+
}
712+
713+
// Update the digest with all accumulated values.
714+
fn digest(&mut self) {
715+
if self.buffer.is_empty() {
716+
return;
717+
}
718+
let new = std::mem::take(&mut self.buffer);
719+
self.digested = self.digested.merge_unsorted(new)
720+
}
721+
722+
pub fn build(&mut self) -> TDigest {
723+
self.digest();
724+
std::mem::take(&mut self.digested)
725+
}
726+
727+
pub fn merge(&mut self, other: Self) {
728+
assert_eq!(self.digested.max_size(), other.digested.max_size());
729+
let digvec = vec![std::mem::take(&mut self.digested), other.digested];
730+
if !self.buffer.is_empty() {
731+
digvec[0].merge_unsorted(std::mem::take(&mut self.buffer));
732+
}
733+
if !other.buffer.is_empty() {
734+
digvec[1].merge_unsorted(other.buffer);
735+
}
736+
self.digested = TDigest::merge_digests(digvec);
737+
}
738+
}
739+
652740
#[cfg(test)]
653741
mod tests {
654742
use super::*;

docs/client.md

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# Client-side aggregation [<sup><mark>experimental</mark></sup>](/docs/README.md#tag-notes)
2+
3+
- Current status: prototype
4+
- Effort remaining: lots
5+
6+
## Purpose
7+
8+
We have long suspected it might be valuable to allow building aggregates
9+
client-side rather than requiring all data be stored in postgres and
10+
aggregated within the toolkit.
11+
12+
https://github.com/timescale/timescaledb-toolkit/issues/485 recently came in
13+
adding weight to this idea. Because this customer requests tdigest, that's
14+
what we'll use for prototyping.
15+
16+
## Use cases
17+
18+
Quoting the above customer:
19+
20+
"In some cases it is not possible to transfer all the non-aggregated data to
21+
TimescaleDB due to it's amount and/or limited connectivity."
22+
23+
## Questions
24+
25+
- Do we want to support a public crate?
26+
- What does that mean?
27+
- Do we need to monitor an email address?
28+
- What promise would we make on response time?
29+
- Is this materially different from what we've already signed up for by
30+
publishing on github?
31+
- How do we handle ownership of the crates.io credentials?
32+
33+
- Which license do we use?
34+
- Some of our code is already a derived work - do we permissively license it
35+
all, or restrict some of it?
36+
37+
- Wire protocol maintenance
38+
- This is a problem we already have, we just didn't realize it, as it is
39+
already possible to construct our aggregates and INSERT them, and they
40+
also in pg dumps; at the moment, you can restore those dumps, though we
41+
haven't made any promise about it. On our stabilized aggregates, users
42+
may assume that is stabilized, too.
43+
- Is there a practical concern here? Or do we just say "not supported"?
44+
- Is it possible to crash the extension with invalid inputs?
45+
- If we commit to a public wire protocol, shouldn't we avoid the
46+
Rust-specific ron and go for something more common?
47+
48+
## Proposal
49+
50+
As a first step, build a crate which externalizes tdigest aggregate creation
51+
(this is prototyped on eg/client branch):
52+
53+
```rust
54+
let mut digester = tdigest::Builder::with_size(N);
55+
loop {
56+
digester.push(value);
57+
}
58+
send_to_postgres(format!("INSERT INTO digests VALUES ({})", digester.build().format_for_postgres()));
59+
```
60+
61+
A good second project would be to add sample projects calling it from popular
62+
languages such as C++ and Python.
63+
64+
In order to provide that API, we must first reorganize the tdigest
65+
implementation so that all business logic is in the tdigest crate. Some is
66+
currently in the pgx extension crate.
67+
68+
For each aggregate, the transient state is actually a Builder pattern hidden
69+
hidden behind pgx machinery.
70+
71+
On this branch, I've moved TDigestTransState into tdigest::Builder.
72+
73+
Currently, we use default ron behavior to serialize the raw implementation
74+
details of the pg_type . Users can insert inconsistent data now, and it
75+
doesn't look like we validate that at insertion time.
76+
77+
We should reconsider this for all pg_types regardless of the overall client
78+
project. Is it possible NOT to offer serialized insertion at all? If so,
79+
turning that off would be a good first step.
80+
81+
Then we can enable it just where we want to.
82+
83+
We should put more thought into the serialization format we intentionally
84+
support. Currently it contains redundancy which we can eliminate by
85+
implementing serialization carefully rather than relying on defaults.
86+
87+
## Proof of concept
88+
89+
This is a simple demonstration of inserting serialized tdigest into a table,
90+
showing that it works the same way as an aggregate built by the extension.
91+
92+
```SQL ,non-transactional
93+
CREATE TABLE test (data DOUBLE PRECISION);
94+
INSERT INTO test SELECT generate_series(0.01, 1, 0.01);
95+
96+
CREATE VIEW digest AS SELECT tdigest(100, data) FROM test;
97+
98+
CREATE TABLE digest2 (tdigest tdigest);
99+
INSERT INTO digest2 VALUES ('(version:1,max_buckets:100,count:100,sum:50.50000000000001,min:0.01,max:1,centroids:[(mean:0.01,weight:1),(mean:0.02,weight:1),(mean:0.03,weight:1),(mean:0.04,weight:1),(mean:0.05,weight:1),(mean:0.06,weight:1),(mean:0.07,weight:1),(mean:0.08,weight:1),(mean:0.09,weight:1),(mean:0.1,weight:1),(mean:0.11,weight:1),(mean:0.12,weight:1),(mean:0.13,weight:1),(mean:0.14,weight:1),(mean:0.15,weight:1),(mean:0.16,weight:1),(mean:0.17,weight:1),(mean:0.18,weight:1),(mean:0.19,weight:1),(mean:0.2,weight:1),(mean:0.21,weight:1),(mean:0.22,weight:1),(mean:0.23,weight:1),(mean:0.24,weight:1),(mean:0.25,weight:1),(mean:0.26,weight:1),(mean:0.27,weight:1),(mean:0.28,weight:1),(mean:0.29,weight:1),(mean:0.3,weight:1),(mean:0.31,weight:1),(mean:0.32,weight:1),(mean:0.33,weight:1),(mean:0.34,weight:1),(mean:0.35,weight:1),(mean:0.36,weight:1),(mean:0.37,weight:1),(mean:0.38,weight:1),(mean:0.39,weight:1),(mean:0.4,weight:1),(mean:0.41,weight:1),(mean:0.42,weight:1),(mean:0.43,weight:1),(mean:0.44,weight:1),(mean:0.45,weight:1),(mean:0.46,weight:1),(mean:0.47,weight:1),(mean:0.48,weight:1),(mean:0.49,weight:1),(mean:0.5,weight:1),(mean:0.51,weight:1),(mean:0.525,weight:2),(mean:0.545,weight:2),(mean:0.565,weight:2),(mean:0.585,weight:2),(mean:0.605,weight:2),(mean:0.625,weight:2),(mean:0.64,weight:1),(mean:0.655,weight:2),(mean:0.675,weight:2),(mean:0.69,weight:1),(mean:0.705,weight:2),(mean:0.72,weight:1),(mean:0.735,weight:2),(mean:0.75,weight:1),(mean:0.76,weight:1),(mean:0.775,weight:2),(mean:0.79,weight:1),(mean:0.8,weight:1),(mean:0.815,weight:2),(mean:0.83,weight:1),(mean:0.84,weight:1),(mean:0.85,weight:1),(mean:0.86,weight:1),(mean:0.87,weight:1),(mean:0.88,weight:1),(mean:0.89,weight:1),(mean:0.9,weight:1),(mean:0.91,weight:1),(mean:0.92,weight:1),(mean:0.93,weight:1),(mean:0.94,weight:1),(mean:0.95,weight:1),(mean:0.96,weight:1),(mean:0.97,weight:1),(mean:0.98,weight:1),(mean:0.99,weight:1),(mean:1,weight:1)])');
100+
```
101+
102+
```SQL
103+
SELECT
104+
min_val(tdigest),
105+
max_val(tdigest),
106+
num_vals(tdigest)
107+
FROM digest;
108+
```
109+
```output
110+
min_val | max_val | num_vals
111+
---------+---------+----------
112+
0.01 | 1 | 100
113+
```
114+
115+
Inserting serialized tdigest into table behaves the same:
116+
117+
```SQL
118+
SELECT
119+
min_val(tdigest),
120+
max_val(tdigest),
121+
num_vals(tdigest)
122+
FROM digest2;
123+
```
124+
```output
125+
min_val | max_val | num_vals
126+
---------+---------+----------
127+
0.01 | 1 | 100
128+
```

0 commit comments

Comments
 (0)