Skip to content

Commit 17e0442

Browse files
committed
refactor: extract common pattern into append_param
1 parent a7b55af commit 17e0442

File tree

1 file changed

+39
-57
lines changed

1 file changed

+39
-57
lines changed

src/sinks/clickhouse/service.rs

Lines changed: 39 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ impl HttpServiceRequestBuilder<PartitionKey> for ClickhouseServiceRequestBuilder
110110
}
111111
}
112112

113+
fn append_param<T: ToString>(uri: &mut String, key: &str, value: Option<T>) {
114+
if let Some(val) = value {
115+
uri.push_str(&format!("{}={}&", key, val.to_string()));
116+
}
117+
}
118+
113119
fn set_uri_query(
114120
uri: &Uri,
115121
database: &str,
@@ -139,63 +145,39 @@ fn set_uri_query(
139145
}
140146

141147
uri.push_str("?input_format_import_nested_json=1&");
142-
if let Some(skip_unknown) = skip_unknown {
143-
if skip_unknown {
144-
uri.push_str("input_format_skip_unknown_fields=1&");
145-
} else {
146-
uri.push_str("input_format_skip_unknown_fields=0&")
147-
}
148-
}
149-
if date_time_best_effort {
150-
uri.push_str("date_time_input_format=best_effort&")
151-
}
152-
if insert_random_shard {
153-
uri.push_str("insert_distributed_one_random_shard=1&")
154-
}
155-
if let Some(async_insert) = query_settings.async_insert {
156-
if async_insert {
157-
uri.push_str("async_insert=1&");
158-
} else {
159-
uri.push_str("async_insert=0&")
160-
}
161-
}
162-
if let Some(wait_for_async_insert) = query_settings.wait_for_async_insert {
163-
if wait_for_async_insert {
164-
uri.push_str("wait_for_async_insert=1&");
165-
} else {
166-
uri.push_str("wait_for_async_insert=0&")
167-
}
168-
}
169-
if let Some(wait_for_async_insert_timeout) = query_settings.wait_for_async_insert_timeout {
170-
uri.push_str(
171-
format!(
172-
"wait_for_async_insert_timeout={}&",
173-
wait_for_async_insert_timeout,
174-
)
175-
.as_str(),
176-
);
177-
}
178-
if let Some(async_insert_deduplicate) = query_settings.async_insert_deduplicate {
179-
if async_insert_deduplicate {
180-
uri.push_str("async_insert_deduplicate=1&");
181-
} else {
182-
uri.push_str("async_insert_deduplicate=0&")
183-
}
184-
}
185-
if let Some(async_insert_max_data_size) = query_settings.async_insert_max_data_size {
186-
uri.push_str(
187-
format!("async_insert_max_data_size={}&", async_insert_max_data_size,).as_str(),
188-
);
189-
}
190-
if let Some(async_insert_max_query_number) = query_settings.async_insert_max_query_number {
191-
uri.push_str(
192-
format!(
193-
"async_insert_max_query_number={}&",
194-
async_insert_max_query_number,
195-
)
196-
.as_str(),
197-
);
198-
}
148+
append_param(&mut uri, "input_format_skip_unknown_fields", skip_unknown);
149+
append_param(
150+
&mut uri,
151+
"date_time_best_effort",
152+
Some(date_time_best_effort),
153+
);
154+
append_param(&mut uri, "insert_random_shard", Some(insert_random_shard));
155+
append_param(&mut uri, "async_insert", query_settings.async_insert);
156+
append_param(
157+
&mut uri,
158+
"wait_for_async_insert",
159+
query_settings.wait_for_async_insert,
160+
);
161+
append_param(
162+
&mut uri,
163+
"wait_for_async_insert_timeout",
164+
query_settings.wait_for_async_insert_timeout,
165+
);
166+
append_param(
167+
&mut uri,
168+
"async_insert_deduplicate",
169+
query_settings.async_insert_deduplicate,
170+
);
171+
append_param(
172+
&mut uri,
173+
"async_insert_max_data_size",
174+
query_settings.async_insert_max_data_size,
175+
);
176+
append_param(
177+
&mut uri,
178+
"async_insert_max_query_number",
179+
query_settings.async_insert_max_query_number,
180+
);
199181
uri.push_str(query.as_str());
200182

201183
uri.parse::<Uri>()

0 commit comments

Comments
 (0)