Skip to content

Commit f83b7e0

Browse files
pm5prontthomasqueirozb
authored
feat(clickhouse sink): add query_settings to clickhouse sink (#22764)
* feat(sink: clickhouse): add query_settings option to clickhouse sink * Rename query settings struct * config UX improvements * Update changelog.d/22764_clickhouse_query_settings.feature.md * Fix spelling * cargo fmt * regen docs --------- Co-authored-by: Pavlos Rontidis <[email protected]> Co-authored-by: Thomas <[email protected]>
1 parent a9366ad commit f83b7e0

File tree

7 files changed

+211
-12
lines changed

7 files changed

+211
-12
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add a `query_settings` and `async_insert_settings` option in the `clickhouse` sink, which allows users to configure asynchronous inserts.
2+
3+
authors: pm5

src/sinks/clickhouse/config.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,62 @@ pub struct ClickhouseConfig {
114114
skip_serializing_if = "crate::serde::is_default"
115115
)]
116116
pub acknowledgements: AcknowledgementsConfig,
117+
118+
#[configurable(derived)]
119+
#[serde(default)]
120+
pub query_settings: QuerySettingsConfig,
121+
}
122+
123+
/// Query settings for the `clickhouse` sink.
124+
#[configurable_component]
125+
#[derive(Clone, Copy, Debug, Default)]
126+
#[serde(deny_unknown_fields)]
127+
pub struct QuerySettingsConfig {
128+
/// Async insert-related settings.
129+
#[serde(default)]
130+
pub async_insert_settings: AsyncInsertSettingsConfig,
131+
}
132+
133+
/// Async insert related settings for the `clickhouse` sink.
134+
#[configurable_component]
135+
#[derive(Clone, Copy, Debug, Default)]
136+
#[serde(deny_unknown_fields)]
137+
pub struct AsyncInsertSettingsConfig {
138+
/// Sets `async_insert`, allowing ClickHouse to queue the inserted data and later flush to table in the background.
139+
///
140+
/// If left unspecified, use the default provided by the `ClickHouse` server.
141+
#[serde(default)]
142+
pub enabled: Option<bool>,
143+
144+
/// Sets `wait_for`, allowing ClickHouse to wait for processing of asynchronous insertion.
145+
///
146+
/// If left unspecified, use the default provided by the `ClickHouse` server.
147+
#[serde(default)]
148+
pub wait_for_processing: Option<bool>,
149+
150+
/// Sets 'wait_for_processing_timeout`, to control the timeout for waiting for processing asynchronous insertion.
151+
///
152+
/// If left unspecified, use the default provided by the `ClickHouse` server.
153+
#[serde(default)]
154+
pub wait_for_processing_timeout: Option<u64>,
155+
156+
/// Sets `async_insert_deduplicate`, allowing ClickHouse to perform deduplication when inserting blocks in the replicated table.
157+
///
158+
/// If left unspecified, use the default provided by the `ClickHouse` server.
159+
#[serde(default)]
160+
pub deduplicate: Option<bool>,
161+
162+
/// Sets `async_insert_max_data_size`, the maximum size in bytes of unparsed data collected per query before being inserted.
163+
///
164+
/// If left unspecified, use the default provided by the `ClickHouse` server.
165+
#[serde(default)]
166+
pub max_data_size: Option<u64>,
167+
168+
/// Sets `async_insert_max_query_number`, the maximum number of insert queries before being inserted
169+
///
170+
/// If left unspecified, use the default provided by the `ClickHouse` server.
171+
#[serde(default)]
172+
pub max_query_number: Option<u64>,
117173
}
118174

119175
impl_generate_config_from_default!(ClickhouseConfig);
@@ -137,6 +193,7 @@ impl SinkConfig for ClickhouseConfig {
137193
date_time_best_effort: self.date_time_best_effort,
138194
insert_random_shard: self.insert_random_shard,
139195
compression: self.compression,
196+
query_settings: self.query_settings,
140197
};
141198

142199
let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =

src/sinks/clickhouse/integration_tests.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ use vector_lib::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event,
1919
use vector_lib::lookup::PathPrefix;
2020
use warp::Filter;
2121

22-
use super::*;
2322
use crate::{
2423
codecs::{TimestampFormat, Transformer},
2524
config::{log_schema, SinkConfig, SinkContext},
26-
sinks::util::{BatchConfig, Compression, TowerRequestConfig},
25+
sinks::{
26+
clickhouse::config::ClickhouseConfig,
27+
util::{BatchConfig, Compression, TowerRequestConfig},
28+
},
2729
test_util::{
2830
components::{run_and_assert_sink_compliance, SINK_TAGS},
2931
random_table_name, trace_init,

src/sinks/clickhouse/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
//!
1010
//! This sink only supports logs for now but could support metrics and traces as well in the future.
1111
12-
mod config;
12+
pub mod config;
1313
#[cfg(all(test, feature = "clickhouse-integration-tests"))]
1414
mod integration_tests;
1515
mod request_builder;
1616
mod service;
1717
mod sink;
18-
pub use self::config::ClickhouseConfig;

src/sinks/clickhouse/service.rs

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Service implementation for the `Clickhouse` sink.
22
3+
use super::config::QuerySettingsConfig;
34
use super::sink::PartitionKey;
45
use crate::{
56
http::{Auth, HttpError},
@@ -67,6 +68,7 @@ pub(super) struct ClickhouseServiceRequestBuilder {
6768
pub(super) date_time_best_effort: bool,
6869
pub(super) insert_random_shard: bool,
6970
pub(super) compression: Compression,
71+
pub(super) query_settings: QuerySettingsConfig,
7072
}
7173

7274
impl HttpServiceRequestBuilder<PartitionKey> for ClickhouseServiceRequestBuilder {
@@ -84,6 +86,7 @@ impl HttpServiceRequestBuilder<PartitionKey> for ClickhouseServiceRequestBuilder
8486
self.skip_unknown_fields,
8587
self.date_time_best_effort,
8688
self.insert_random_shard,
89+
self.query_settings,
8790
)?;
8891

8992
let auth: Option<Auth> = self.auth.clone();
@@ -107,6 +110,18 @@ impl HttpServiceRequestBuilder<PartitionKey> for ClickhouseServiceRequestBuilder
107110
}
108111
}
109112

113+
fn append_param<T: ToString>(uri: &mut String, key: &str, value: Option<T>) {
114+
if let Some(val) = value {
115+
uri.push_str(&format!("{}={}&", key, val.to_string()));
116+
}
117+
}
118+
fn append_param_bool(uri: &mut String, key: &str, value: Option<bool>) {
119+
if let Some(val) = value {
120+
uri.push_str(&format!("{}={}&", key, if val { 1 } else { 0 }));
121+
}
122+
}
123+
124+
#[allow(clippy::too_many_arguments)]
110125
fn set_uri_query(
111126
uri: &Uri,
112127
database: &str,
@@ -115,6 +130,7 @@ fn set_uri_query(
115130
skip_unknown: Option<bool>,
116131
date_time_best_effort: bool,
117132
insert_random_shard: bool,
133+
query_settings: QuerySettingsConfig,
118134
) -> crate::Result<Uri> {
119135
let query = url::form_urlencoded::Serializer::new(String::new())
120136
.append_pair(
@@ -135,19 +151,45 @@ fn set_uri_query(
135151
}
136152

137153
uri.push_str("?input_format_import_nested_json=1&");
138-
if let Some(skip_unknown) = skip_unknown {
139-
if skip_unknown {
140-
uri.push_str("input_format_skip_unknown_fields=1&");
141-
} else {
142-
uri.push_str("input_format_skip_unknown_fields=0&")
143-
}
144-
}
154+
append_param_bool(&mut uri, "input_format_skip_unknown_fields", skip_unknown);
145155
if date_time_best_effort {
146156
uri.push_str("date_time_input_format=best_effort&")
147157
}
148158
if insert_random_shard {
149159
uri.push_str("insert_distributed_one_random_shard=1&")
150160
}
161+
append_param_bool(
162+
&mut uri,
163+
"async_insert",
164+
query_settings.async_insert_settings.enabled,
165+
);
166+
append_param_bool(
167+
&mut uri,
168+
"wait_for_async_insert",
169+
query_settings.async_insert_settings.wait_for_processing,
170+
);
171+
append_param(
172+
&mut uri,
173+
"wait_for_async_insert_timeout",
174+
query_settings
175+
.async_insert_settings
176+
.wait_for_processing_timeout,
177+
);
178+
append_param_bool(
179+
&mut uri,
180+
"async_insert_deduplicate",
181+
query_settings.async_insert_settings.deduplicate,
182+
);
183+
append_param(
184+
&mut uri,
185+
"async_insert_max_data_size",
186+
query_settings.async_insert_settings.max_data_size,
187+
);
188+
append_param(
189+
&mut uri,
190+
"async_insert_max_query_number",
191+
query_settings.async_insert_settings.max_query_number,
192+
);
151193
uri.push_str(query.as_str());
152194

153195
uri.parse::<Uri>()
@@ -158,6 +200,7 @@ fn set_uri_query(
158200
#[cfg(test)]
159201
mod tests {
160202
use super::*;
203+
use crate::sinks::clickhouse::config::*;
161204

162205
#[test]
163206
fn encode_valid() {
@@ -169,6 +212,7 @@ mod tests {
169212
Some(false),
170213
true,
171214
false,
215+
QuerySettingsConfig::default(),
172216
)
173217
.unwrap();
174218
assert_eq!(uri.to_string(), "http://localhost:80/?\
@@ -185,6 +229,7 @@ mod tests {
185229
Some(false),
186230
false,
187231
false,
232+
QuerySettingsConfig::default(),
188233
)
189234
.unwrap();
190235
assert_eq!(uri.to_string(), "http://localhost:80/?\
@@ -200,6 +245,7 @@ mod tests {
200245
Some(true),
201246
true,
202247
false,
248+
QuerySettingsConfig::default(),
203249
)
204250
.unwrap();
205251
assert_eq!(uri.to_string(), "http://localhost:80/?\
@@ -216,11 +262,38 @@ mod tests {
216262
None,
217263
true,
218264
false,
265+
QuerySettingsConfig::default(),
266+
)
267+
.unwrap();
268+
assert_eq!(uri.to_string(), "http://localhost:80/?\
269+
input_format_import_nested_json=1&\
270+
date_time_input_format=best_effort&\
271+
query=INSERT+INTO+%22my_database%22.%22my_%5C%22table%5C%22%22+FORMAT+JSONAsObject");
272+
273+
let uri = set_uri_query(
274+
&"http://localhost:80".parse().unwrap(),
275+
"my_database",
276+
"my_\"table\"",
277+
Format::JsonAsObject,
278+
None,
279+
true,
280+
false,
281+
QuerySettingsConfig {
282+
async_insert_settings: AsyncInsertSettingsConfig {
283+
enabled: Some(true),
284+
wait_for_processing: Some(true),
285+
wait_for_processing_timeout: Some(500),
286+
..AsyncInsertSettingsConfig::default()
287+
},
288+
},
219289
)
220290
.unwrap();
221291
assert_eq!(uri.to_string(), "http://localhost:80/?\
222292
input_format_import_nested_json=1&\
223293
date_time_input_format=best_effort&\
294+
async_insert=1&\
295+
wait_for_async_insert=1&\
296+
wait_for_async_insert_timeout=500&\
224297
query=INSERT+INTO+%22my_database%22.%22my_%5C%22table%5C%22%22+FORMAT+JSONAsObject");
225298
}
226299

@@ -234,6 +307,7 @@ mod tests {
234307
Some(false),
235308
false,
236309
false,
310+
QuerySettingsConfig::default(),
237311
)
238312
.unwrap_err();
239313
}

website/cue/reference/components/sinks/generated/clickhouse.cue

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,70 @@ generated: components: sinks: clickhouse: configuration: {
337337
required: false
338338
type: bool: default: false
339339
}
340+
query_settings: {
341+
description: "Query settings for the `clickhouse` sink."
342+
required: false
343+
type: object: options: async_insert_settings: {
344+
description: "Async insert-related settings."
345+
required: false
346+
type: object: options: {
347+
deduplicate: {
348+
description: """
349+
Sets `async_insert_deduplicate`, allowing ClickHouse to perform deduplication when inserting blocks in the replicated table.
350+
351+
If left unspecified, use the default provided by the `ClickHouse` server.
352+
"""
353+
required: false
354+
type: bool: {}
355+
}
356+
enabled: {
357+
description: """
358+
Sets `async_insert`, allowing ClickHouse to queue the inserted data and later flush to table in the background.
359+
360+
If left unspecified, use the default provided by the `ClickHouse` server.
361+
"""
362+
required: false
363+
type: bool: {}
364+
}
365+
max_data_size: {
366+
description: """
367+
Sets `async_insert_max_data_size`, the maximum size in bytes of unparsed data collected per query before being inserted.
368+
369+
If left unspecified, use the default provided by the `ClickHouse` server.
370+
"""
371+
required: false
372+
type: uint: {}
373+
}
374+
max_query_number: {
375+
description: """
376+
Sets `async_insert_max_query_number`, the maximum number of insert queries before being inserted
377+
378+
If left unspecified, use the default provided by the `ClickHouse` server.
379+
"""
380+
required: false
381+
type: uint: {}
382+
}
383+
wait_for_processing: {
384+
description: """
385+
Sets `wait_for`, allowing ClickHouse to wait for processing of asynchronous insertion.
386+
387+
If left unspecified, use the default provided by the `ClickHouse` server.
388+
"""
389+
required: false
390+
type: bool: {}
391+
}
392+
wait_for_processing_timeout: {
393+
description: """
394+
Sets 'wait_for_processing_timeout`, to control the timeout for waiting for processing asynchronous insertion.
395+
396+
If left unspecified, use the default provided by the `ClickHouse` server.
397+
"""
398+
required: false
399+
type: uint: {}
400+
}
401+
}
402+
}
403+
}
340404
request: {
341405
description: """
342406
Middleware settings for outbound requests.

website/cue/reference/remap/functions/replace.cue

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ remap: functions: replace: {
7272
title: "Replace with capture groups when not set in the configuration file (use `$$num` in config files)"
7373
source: #"""
7474
# Note that in the context of Vector configuration files, an extra `$` escape character is required
75-
# (i.e. `$$num`) to avoid interepreting `num` as an environment variable.
75+
# (i.e. `$$num`) to avoid interpreting `num` as an environment variable.
7676
replace("foo123bar", r'foo(?P<num>\d+)bar', "$num")
7777
"""#
7878
return: "123"

0 commit comments

Comments
 (0)