Skip to content

Commit a308782

Browse files
authored
fix: websocket runnable #6675
1 parent 6f15459 commit a308782

File tree

3 files changed

+12
-44
lines changed

3 files changed

+12
-44
lines changed

backend/windmill-api/src/triggers/websocket/handler.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tokio_tungstenite::connect_async;
1212
use windmill_common::{
1313
db::UserDB,
1414
error::{Error, Result},
15+
worker::to_raw_value,
1516
};
1617
use windmill_git_sync::DeployedObject;
1718

@@ -236,7 +237,7 @@ impl TriggerCrud for WebsocketTrigger {
236237
url.starts_with("$flow:"),
237238
&db,
238239
authed.clone(),
239-
config.url_runnable_args.as_ref(),
240+
config.url_runnable_args.as_ref().map(to_raw_value).as_ref(),
240241
&workspace_id,
241242
)
242243
.await?,

backend/windmill-api/src/triggers/websocket/listener.rs

Lines changed: 7 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::triggers::{
55
trigger_runnable, trigger_runnable_and_wait_for_raw_result,
66
trigger_runnable_and_wait_for_raw_result_with_error_ctx, TriggerJobArgs,
77
},
8-
websocket::WebsocketConfig,
8+
websocket::{get_url_from_runnable_value, WebsocketConfig},
99
Listener,
1010
};
1111
use anyhow::Context;
@@ -31,43 +31,6 @@ use windmill_common::{
3131
use windmill_queue::PushArgsOwned;
3232

3333
impl ListeningTrigger<WebsocketConfig> {
34-
async fn get_url_from_runnable(&self, db: &DB) -> Result<String> {
35-
let runnable_kind = if self.is_flow { "Flow" } else { "Script" };
36-
tracing::info!(
37-
"Running {} {} to get WebSocket URL",
38-
runnable_kind.to_lowercase(),
39-
self.path
40-
);
41-
42-
let authed = self.authed(db, "ws").await?;
43-
44-
let args = raw_value_to_args_hashmap(
45-
self.trigger_config.url_runnable_args.as_ref().map(|r| &r.0),
46-
)?;
47-
48-
let result = trigger_runnable_and_wait_for_raw_result_with_error_ctx(
49-
db,
50-
None,
51-
authed,
52-
&self.workspace_id,
53-
&self.script_path,
54-
self.is_flow,
55-
PushArgsOwned { args, extra: None },
56-
None,
57-
None,
58-
None,
59-
"".to_string(), // doesn't matter as no retry/error handler
60-
)
61-
.await?;
62-
63-
serde_json::from_str::<String>(result.get()).map_err(|_| {
64-
Error::BadConfig(format!(
65-
"{} {} did not return a string",
66-
runnable_kind, self.path,
67-
))
68-
})
69-
}
70-
7134
async fn send_initial_messages(
7235
&self,
7336
writer: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
@@ -189,8 +152,12 @@ impl Listener for WebsocketTrigger {
189152
)) => {
190153
return Ok(None);
191154
},
192-
193-
url_result = listening_trigger.get_url_from_runnable(&db) => match url_result {
155+
url_result = {
156+
let authed = listening_trigger.authed(db, "ws").await?;
157+
let args = listening_trigger.trigger_config.url_runnable_args.as_ref().map(|r| &r.0);
158+
let path = url.splitn(2, ':').nth(1).unwrap();
159+
get_url_from_runnable_value(path, url.starts_with("$flow:"), db, authed, args, &listening_trigger.workspace_id)
160+
} => match url_result {
194161
Ok(url) => Cow::Owned(url),
195162
Err(err) => {
196163
return Err(anyhow::anyhow!("Error getting WebSocket URL from runnable after 5 tries: {:?}", err).into());

backend/windmill-api/src/triggers/websocket/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ pub struct TestWebsocketConfig {
6161
}
6262

6363
pub fn value_to_args_hashmap(
64-
args: Option<&serde_json::Value>,
64+
args: Option<&Box<RawValue>>,
6565
) -> Result<HashMap<String, Box<RawValue>>> {
6666
let args = if let Some(args) = args {
6767
let args_map: Option<HashMap<String, serde_json::Value>> =
68-
serde_json::from_value(args.clone())
68+
serde_json::from_str(args.get())
6969
.map_err(|e| Error::BadRequest(format!("invalid json: {}", e)))?;
7070

7171
args_map
@@ -89,7 +89,7 @@ pub async fn get_url_from_runnable_value(
8989
is_flow: bool,
9090
db: &DB,
9191
authed: ApiAuthed,
92-
args: Option<&serde_json::Value>,
92+
args: Option<&Box<RawValue>>,
9393
workspace_id: &str,
9494
) -> Result<String> {
9595
tracing::info!(

0 commit comments

Comments
 (0)