Skip to content

Commit b972eb9

Browse files
authored
feat: add support for contextual vars in SQL (#6791)
1 parent 898d2a8 commit b972eb9

File tree

10 files changed

+121
-23
lines changed

10 files changed

+121
-23
lines changed

backend/windmill-worker/src/bigquery_executor.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use serde::Deserialize;
1919

2020
use crate::common::{build_args_values, resolve_job_timeout};
2121
use crate::common::{
22-
build_http_client, s3_mode_args_to_worker_data, OccupancyMetrics, S3ModeWorkerData,
22+
build_http_client, get_reserved_variables, s3_mode_args_to_worker_data, OccupancyMetrics,
23+
S3ModeWorkerData,
2324
};
2425
use crate::handle_child::run_future_with_polling_update_job_poller;
2526
use crate::sanitized_sql_params::sanitize_and_interpolate_unsafe_sql_args;
@@ -313,6 +314,7 @@ pub async fn do_bigquery(
313314
worker_name: &str,
314315
column_order: &mut Option<Vec<String>>,
315316
occupancy_metrics: &mut OccupancyMetrics,
317+
parent_runnable_path: Option<String>,
316318
) -> windmill_common::error::Result<Box<RawValue>> {
317319
let bigquery_args = build_args_values(job, client, conn).await?;
318320

@@ -364,8 +366,15 @@ pub async fn do_bigquery(
364366
.map_err(|x| Error::ExecutionErr(x.to_string()))?
365367
.args;
366368

367-
let (query, args_to_skip) =
368-
&sanitize_and_interpolate_unsafe_sql_args(query, &sig, &bigquery_args)?;
369+
let reserved_variables =
370+
get_reserved_variables(job, &client.token, conn, parent_runnable_path).await?;
371+
372+
let (query, args_to_skip) = &sanitize_and_interpolate_unsafe_sql_args(
373+
query,
374+
&sig,
375+
&bigquery_args,
376+
&reserved_variables,
377+
)?;
369378

370379
let queries = parse_sql_blocks(query);
371380

backend/windmill-worker/src/duckdb_executor.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use windmill_parser_sql::{parse_duckdb_sig, parse_sql_blocks};
1818
use windmill_queue::{CanceledBy, MiniPulledJob};
1919

2020
use crate::agent_workers::get_ducklake_from_agent_http;
21-
use crate::common::{build_args_values, OccupancyMetrics};
21+
use crate::common::{build_args_values, get_reserved_variables, OccupancyMetrics};
2222
use crate::handle_child::run_future_with_polling_update_job_poller;
2323
#[cfg(feature = "mysql")]
2424
use crate::mysql_executor::MysqlDatabase;
@@ -37,6 +37,7 @@ pub async fn do_duckdb(
3737
// TODO
3838
#[allow(unused_variables)] column_order_ref: &mut Option<Vec<String>>,
3939
occupancy_metrics: &mut OccupancyMetrics,
40+
parent_runnable_path: Option<String>,
4041
) -> Result<Box<RawValue>> {
4142
let token = client.token.clone();
4243
let hidden_passwords = Arc::new(Mutex::new(Vec::<String>::new()));
@@ -48,7 +49,11 @@ pub async fn do_duckdb(
4849
let sig = parse_duckdb_sig(query)?.args;
4950
let mut job_args = build_args_values(job, client, conn).await?;
5051

51-
let (query, _) = &sanitize_and_interpolate_unsafe_sql_args(query, &sig, &job_args)?;
52+
let reserved_variables =
53+
get_reserved_variables(job, &client.token, conn, parent_runnable_path).await?;
54+
55+
let (query, _) =
56+
&sanitize_and_interpolate_unsafe_sql_args(query, &sig, &job_args, &reserved_variables)?;
5257
let query = transform_s3_uris(query).await?;
5358

5459
let job_args = {

backend/windmill-worker/src/mssql_executor.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use windmill_parser_sql::{parse_db_resource, parse_mssql_sig, parse_s3_mode};
2121
use windmill_queue::MiniPulledJob;
2222
use windmill_queue::{append_logs, CanceledBy};
2323

24-
use crate::common::{build_args_values, s3_mode_args_to_worker_data, OccupancyMetrics};
24+
use crate::common::{
25+
build_args_values, get_reserved_variables, s3_mode_args_to_worker_data, OccupancyMetrics,
26+
};
2527
use crate::handle_child::run_future_with_polling_update_job_poller;
2628
use crate::sanitized_sql_params::sanitize_and_interpolate_unsafe_sql_args;
2729
use windmill_common::client::AuthedClient;
@@ -56,23 +58,25 @@ lazy_static::lazy_static! {
5658

5759
pub async fn do_mssql(
5860
job: &MiniPulledJob,
59-
client: &AuthedClient,
61+
authed_client: &AuthedClient,
6062
query: &str,
6163
conn: &Connection,
6264
mem_peak: &mut i32,
6365
canceled_by: &mut Option<CanceledBy>,
6466
worker_name: &str,
6567
occupancy_metrics: &mut OccupancyMetrics,
6668
job_dir: &str,
69+
parent_runnable_path: Option<String>,
6770
) -> error::Result<Box<RawValue>> {
68-
let mssql_args = build_args_values(job, client, conn).await?;
71+
let mssql_args = build_args_values(job, authed_client, conn).await?;
6972

7073
let inline_db_res_path = parse_db_resource(&query);
71-
let s3 = parse_s3_mode(&query)?.map(|s3| s3_mode_args_to_worker_data(s3, client.clone(), job));
74+
let s3 = parse_s3_mode(&query)?
75+
.map(|s3| s3_mode_args_to_worker_data(s3, authed_client.clone(), job));
7276

7377
let db_arg = if let Some(inline_db_res_path) = inline_db_res_path {
7478
Some(
75-
client
79+
authed_client
7680
.get_resource_value_interpolated::<serde_json::Value>(
7781
&inline_db_res_path,
7882
Some(job.id.to_string()),
@@ -189,8 +193,11 @@ pub async fn do_mssql(
189193
.map_err(|x| Error::ExecutionErr(x.to_string()))?
190194
.args;
191195

196+
let reserved_variables =
197+
get_reserved_variables(job, &authed_client.token, conn, parent_runnable_path).await?;
198+
192199
let (query, args_to_skip) =
193-
&sanitize_and_interpolate_unsafe_sql_args(query, &sig, &mssql_args)?;
200+
&sanitize_and_interpolate_unsafe_sql_args(query, &sig, &mssql_args, &reserved_variables)?;
194201

195202
let mut prepared_query = Query::new(query.to_owned());
196203
for arg in &sig {

backend/windmill-worker/src/mysql_executor.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use windmill_queue::CanceledBy;
2626
use windmill_queue::MiniPulledJob;
2727

2828
use crate::{
29-
common::{build_args_values, s3_mode_args_to_worker_data, OccupancyMetrics, S3ModeWorkerData},
29+
common::{
30+
build_args_values, get_reserved_variables, s3_mode_args_to_worker_data, OccupancyMetrics,
31+
S3ModeWorkerData,
32+
},
3033
handle_child::run_future_with_polling_update_job_poller,
3134
sanitized_sql_params::sanitize_and_interpolate_unsafe_sql_args,
3235
};
@@ -149,6 +152,7 @@ pub async fn do_mysql(
149152
worker_name: &str,
150153
column_order: &mut Option<Vec<String>>,
151154
occupancy_metrics: &mut OccupancyMetrics,
155+
parent_runnable_path: Option<String>,
152156
) -> windmill_common::error::Result<Box<RawValue>> {
153157
let job_args = build_args_values(job, client, conn).await?;
154158

@@ -198,7 +202,11 @@ pub async fn do_mysql(
198202
.map_err(|x| Error::ExecutionErr(x.to_string()))?
199203
.args;
200204

201-
let (query, args_to_skip) = &sanitize_and_interpolate_unsafe_sql_args(query, &sig, &job_args)?;
205+
let reserved_variables =
206+
get_reserved_variables(job, &client.token, conn, parent_runnable_path).await?;
207+
208+
let (query, args_to_skip) =
209+
&sanitize_and_interpolate_unsafe_sql_args(query, &sig, &job_args, &reserved_variables)?;
202210

203211
let using_named_params = RE_ARG_MYSQL_NAMED.captures_iter(query).count() > 0;
204212

backend/windmill-worker/src/oracledb_executor.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ use windmill_queue::CanceledBy;
2323

2424
use crate::{
2525
common::{
26-
build_args_values, check_executor_binary_exists, s3_mode_args_to_worker_data,
27-
OccupancyMetrics, S3ModeWorkerData,
26+
build_args_values, check_executor_binary_exists, get_reserved_variables,
27+
s3_mode_args_to_worker_data, OccupancyMetrics, S3ModeWorkerData,
2828
},
2929
handle_child::run_future_with_polling_update_job_poller,
30-
sanitized_sql_params::sanitize_and_interpolate_unsafe_sql_args
30+
sanitized_sql_params::sanitize_and_interpolate_unsafe_sql_args,
3131
};
3232
use windmill_common::client::AuthedClient;
3333

@@ -343,6 +343,7 @@ pub async fn do_oracledb(
343343
worker_name: &str,
344344
column_order: &mut Option<Vec<String>>,
345345
occupancy_metrics: &mut OccupancyMetrics,
346+
parent_runnable_path: Option<String>,
346347
) -> windmill_common::error::Result<Box<RawValue>> {
347348
check_executor_binary_exists(
348349
"the Oracle client lib",
@@ -381,7 +382,11 @@ pub async fn do_oracledb(
381382
.map_err(|x| Error::ExecutionErr(x.to_string()))?
382383
.args;
383384

384-
let (query, args_to_skip) = sanitize_and_interpolate_unsafe_sql_args(query, &sig, &job_args)?;
385+
let reserved_variables =
386+
get_reserved_variables(job, &client.token, conn, parent_runnable_path).await?;
387+
388+
let (query, args_to_skip) =
389+
sanitize_and_interpolate_unsafe_sql_args(query, &sig, &job_args, &reserved_variables)?;
385390

386391
let (statement_values, errors) = get_statement_values(sig.clone(), &job_args, &args_to_skip);
387392

backend/windmill-worker/src/pg_executor.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ use windmill_parser_sql::{
3737
use windmill_queue::{CanceledBy, MiniPulledJob};
3838

3939
use crate::common::{
40-
build_args_values, s3_mode_args_to_worker_data, sizeof_val, OccupancyMetrics, S3ModeWorkerData,
40+
build_args_values, get_reserved_variables, s3_mode_args_to_worker_data, sizeof_val,
41+
OccupancyMetrics, S3ModeWorkerData,
4142
};
4243
use crate::handle_child::run_future_with_polling_update_job_poller;
4344
use crate::sanitized_sql_params::sanitize_and_interpolate_unsafe_sql_args;
@@ -186,6 +187,7 @@ pub async fn do_postgresql(
186187
worker_name: &str,
187188
column_order: &mut Option<Vec<String>>,
188189
occupancy_metrics: &mut OccupancyMetrics,
190+
parent_runnable_path: Option<String>,
189191
) -> error::Result<Box<RawValue>> {
190192
let pg_args = build_args_values(job, client, conn).await?;
191193

@@ -312,7 +314,11 @@ pub async fn do_postgresql(
312314

313315
let sig = parse_pgsql_sig(&query).map_err(|x| Error::ExecutionErr(x.to_string()))?;
314316

315-
let (query, _) = &sanitize_and_interpolate_unsafe_sql_args(query, &sig.args, &pg_args)?;
317+
let reserved_variables =
318+
get_reserved_variables(job, &client.token, conn, parent_runnable_path).await?;
319+
320+
let (query, _) =
321+
&sanitize_and_interpolate_unsafe_sql_args(query, &sig.args, &pg_args, &reserved_variables)?;
316322

317323
let queries = parse_sql_blocks(query);
318324

backend/windmill-worker/src/sanitized_sql_params.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
use anyhow::anyhow;
2-
use std::collections::HashMap;
2+
use regex::Regex;
3+
use std::collections::{HashMap, HashSet};
34

45
use serde_json::Value;
56
use windmill_common::error;
67
use windmill_parser::Arg;
78
use windmill_parser_sql::{SANITIZED_ENUM_STR, SANITIZED_RAW_STRING_STR};
89

10+
lazy_static::lazy_static! {
11+
static ref RE_SQL_CONTEXTUAL_VAR: Regex = Regex::new(r"%%WM_[A-Z_]+%%").unwrap();
12+
}
13+
914
/// Identifier must be a continuous ASCII alphanumeric word, not starting with
1015
/// a number, that can contain underscores
1116
fn sanitize_identifier(arg: &Arg, input: &str) -> Result<(), error::Error> {
@@ -28,14 +33,39 @@ fn sanitize_identifier(arg: &Arg, input: &str) -> Result<(), error::Error> {
2833
}
2934
}
3035

36+
fn replace_contextual_variables(
37+
code: &mut String,
38+
contextual_variables: &HashMap<String, String>,
39+
) -> () {
40+
let vars = RE_SQL_CONTEXTUAL_VAR
41+
.find_iter(&code)
42+
.map(|m| m.as_str().to_string())
43+
.collect::<HashSet<_>>();
44+
45+
for var_pattern in vars {
46+
let var_name = var_pattern
47+
.strip_prefix("%%")
48+
.unwrap()
49+
.strip_suffix("%%")
50+
.unwrap();
51+
let var_value = contextual_variables.get(var_name);
52+
if let Some(var_value) = var_value {
53+
*code = code.replace(&var_pattern, var_value);
54+
}
55+
}
56+
}
57+
3158
pub fn sanitize_and_interpolate_unsafe_sql_args(
3259
code: &str,
3360
args: &Vec<Arg>,
3461
args_map: &HashMap<String, Value>,
62+
contextual_variables: &HashMap<String, String>,
3563
) -> Result<(String, Vec<String>), error::Error> {
3664
let mut ret = code.to_string();
3765
let mut args_to_skip = vec![];
3866

67+
replace_contextual_variables(&mut ret, contextual_variables);
68+
3969
for arg in args {
4070
if let Some(typ) = &arg.otyp {
4171
let pattern = format!("%%{}%%", arg.name);

backend/windmill-worker/src/snowflake_executor.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use windmill_queue::{CanceledBy, MiniPulledJob, HTTP_CLIENT};
2020

2121
use serde::{Deserialize, Serialize};
2222

23-
use crate::common::build_args_values;
23+
use crate::common::{build_args_values, get_reserved_variables};
2424
use crate::common::{
2525
build_http_client, resolve_job_timeout, s3_mode_args_to_worker_data, OccupancyMetrics,
2626
S3ModeWorkerData,
@@ -132,12 +132,14 @@ fn do_snowflake_inner<'a>(
132132
skip_collect: bool,
133133
http_client: &'a Client,
134134
s3: Option<S3ModeWorkerData>,
135+
reserved_variables: &HashMap<String, String>,
135136
) -> windmill_common::error::Result<BoxFuture<'a, windmill_common::error::Result<Box<RawValue>>>> {
136137
let sig = parse_snowflake_sig(&query)
137138
.map_err(|x| Error::ExecutionErr(x.to_string()))?
138139
.args;
139140

140-
let (query, args_to_skip) = &sanitize_and_interpolate_unsafe_sql_args(query, &sig, &job_args)?;
141+
let (query, args_to_skip) =
142+
&sanitize_and_interpolate_unsafe_sql_args(query, &sig, &job_args, reserved_variables)?;
141143

142144
body.insert("statement".to_string(), json!(query));
143145

@@ -282,6 +284,7 @@ pub async fn do_snowflake(
282284
worker_name: &str,
283285
column_order: &mut Option<Vec<String>>,
284286
occupancy_metrics: &mut OccupancyMetrics,
287+
parent_runnable_path: Option<String>,
285288
) -> windmill_common::error::Result<Box<RawValue>> {
286289
let snowflake_args = build_args_values(job, client, conn).await?;
287290

@@ -403,6 +406,9 @@ pub async fn do_snowflake(
403406

404407
let http_client = build_http_client(timeout_duration)?;
405408

409+
let reserved_variables =
410+
get_reserved_variables(job, &client.token, conn, parent_runnable_path).await?;
411+
406412
let result_f = if queries.len() > 1 {
407413
let futures = queries
408414
.iter()
@@ -419,6 +425,7 @@ pub async fn do_snowflake(
419425
annotations.return_last_result && i < queries.len() - 1,
420426
&http_client,
421427
s3.clone(),
428+
&reserved_variables,
422429
)
423430
})
424431
.collect::<windmill_common::error::Result<Vec<_>>>()?;
@@ -449,6 +456,7 @@ pub async fn do_snowflake(
449456
false,
450457
&http_client,
451458
s3.clone(),
459+
&reserved_variables,
452460
)?
453461
};
454462
let r = run_future_with_polling_update_job_poller(

backend/windmill-worker/src/worker.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3016,6 +3016,7 @@ async fn handle_code_execution_job(
30163016
worker_name,
30173017
column_order,
30183018
occupancy_metrics,
3019+
parent_runnable_path,
30193020
)
30203021
.await;
30213022
} else if language == Some(ScriptLang::Mysql) {
@@ -3035,6 +3036,7 @@ async fn handle_code_execution_job(
30353036
worker_name,
30363037
column_order,
30373038
occupancy_metrics,
3039+
parent_runnable_path,
30383040
)
30393041
.await;
30403042
} else if language == Some(ScriptLang::Bigquery) {
@@ -3065,6 +3067,7 @@ async fn handle_code_execution_job(
30653067
worker_name,
30663068
column_order,
30673069
occupancy_metrics,
3070+
parent_runnable_path,
30683071
)
30693072
.await;
30703073
}
@@ -3088,6 +3091,7 @@ async fn handle_code_execution_job(
30883091
worker_name,
30893092
column_order,
30903093
occupancy_metrics,
3094+
parent_runnable_path,
30913095
)
30923096
.await;
30933097
}
@@ -3119,6 +3123,7 @@ async fn handle_code_execution_job(
31193123
worker_name,
31203124
occupancy_metrics,
31213125
job_dir,
3126+
parent_runnable_path,
31223127
)
31233128
.await;
31243129
}
@@ -3150,6 +3155,7 @@ async fn handle_code_execution_job(
31503155
worker_name,
31513156
column_order,
31523157
occupancy_metrics,
3158+
parent_runnable_path,
31533159
)
31543160
.await;
31553161
}
@@ -3174,6 +3180,7 @@ async fn handle_code_execution_job(
31743180
worker_name,
31753181
column_order,
31763182
occupancy_metrics,
3183+
parent_runnable_path,
31773184
)
31783185
.await;
31793186
}

0 commit comments

Comments
 (0)