Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/windmill-api/src/triggers/websocket/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio_tungstenite::connect_async;
use windmill_common::{
db::UserDB,
error::{Error, Result},
worker::to_raw_value,
};
use windmill_git_sync::DeployedObject;

Expand Down Expand Up @@ -236,7 +237,7 @@ impl TriggerCrud for WebsocketTrigger {
url.starts_with("$flow:"),
&db,
authed.clone(),
config.url_runnable_args.as_ref(),
config.url_runnable_args.as_ref().map(to_raw_value).as_ref(),
&workspace_id,
)
.await?,
Expand Down
47 changes: 7 additions & 40 deletions backend/windmill-api/src/triggers/websocket/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::triggers::{
trigger_runnable, trigger_runnable_and_wait_for_raw_result,
trigger_runnable_and_wait_for_raw_result_with_error_ctx, TriggerJobArgs,
},
websocket::WebsocketConfig,
websocket::{get_url_from_runnable_value, WebsocketConfig},
Listener,
};
use anyhow::Context;
Expand All @@ -31,43 +31,6 @@ use windmill_common::{
use windmill_queue::PushArgsOwned;

impl ListeningTrigger<WebsocketConfig> {
async fn get_url_from_runnable(&self, db: &DB) -> Result<String> {
let runnable_kind = if self.is_flow { "Flow" } else { "Script" };
tracing::info!(
"Running {} {} to get WebSocket URL",
runnable_kind.to_lowercase(),
self.path
);

let authed = self.authed(db, "ws").await?;

let args = raw_value_to_args_hashmap(
self.trigger_config.url_runnable_args.as_ref().map(|r| &r.0),
)?;

let result = trigger_runnable_and_wait_for_raw_result_with_error_ctx(
db,
None,
authed,
&self.workspace_id,
&self.script_path,
self.is_flow,
PushArgsOwned { args, extra: None },
None,
None,
None,
"".to_string(), // doesn't matter as no retry/error handler
)
.await?;

serde_json::from_str::<String>(result.get()).map_err(|_| {
Error::BadConfig(format!(
"{} {} did not return a string",
runnable_kind, self.path,
))
})
}

async fn send_initial_messages(
&self,
writer: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
Expand Down Expand Up @@ -189,8 +152,12 @@ impl Listener for WebsocketTrigger {
)) => {
return Ok(None);
},

url_result = listening_trigger.get_url_from_runnable(&db) => match url_result {
url_result = {
let authed = listening_trigger.authed(db, "ws").await?;
let args = listening_trigger.trigger_config.url_runnable_args.as_ref().map(|r| &r.0);
let path = url.splitn(2, ':').nth(1).unwrap();
get_url_from_runnable_value(path, url.starts_with("$flow:"), db, authed, args, &listening_trigger.workspace_id)
} => match url_result {
Ok(url) => Cow::Owned(url),
Err(err) => {
return Err(anyhow::anyhow!("Error getting WebSocket URL from runnable after 5 tries: {:?}", err).into());
Expand Down
6 changes: 3 additions & 3 deletions backend/windmill-api/src/triggers/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ pub struct TestWebsocketConfig {
}

pub fn value_to_args_hashmap(
args: Option<&serde_json::Value>,
args: Option<&Box<RawValue>>,
) -> Result<HashMap<String, Box<RawValue>>> {
let args = if let Some(args) = args {
let args_map: Option<HashMap<String, serde_json::Value>> =
serde_json::from_value(args.clone())
serde_json::from_str(args.get())
.map_err(|e| Error::BadRequest(format!("invalid json: {}", e)))?;

args_map
Expand All @@ -89,7 +89,7 @@ pub async fn get_url_from_runnable_value(
is_flow: bool,
db: &DB,
authed: ApiAuthed,
args: Option<&serde_json::Value>,
args: Option<&Box<RawValue>>,
workspace_id: &str,
) -> Result<String> {
tracing::info!(
Expand Down