Skip to content

Commit d382ea7

Browse files
authored
feat: support esm mode for codebase bundles (#6709)
* rawAppsS3 * make fn common * all * merge * nit * fix ingress * all * all * all * all
1 parent 0ba5e3e commit d382ea7

File tree

13 files changed

+352
-208
lines changed

13 files changed

+352
-208
lines changed

backend/windmill-api/src/jobs.rs

Lines changed: 11 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use quick_cache::sync::Cache;
1818
use serde_json::value::RawValue;
1919
use serde_json::Value;
2020
use sqlx::Pool;
21+
use windmill_common::s3_helpers::{upload_artifact_to_store, BundleFormat};
2122
use std::collections::HashMap;
2223
use std::hash::{DefaultHasher, Hash, Hasher};
2324
use std::ops::{Deref, DerefMut};
@@ -41,7 +42,6 @@ use windmill_common::DYNAMIC_INPUT_CACHE;
4142
#[cfg(all(feature = "enterprise", feature = "smtp"))]
4243
use windmill_common::{email_oss::send_email_html, server::load_smtp_config};
4344

44-
use windmill_common::scripts::PREVIEW_IS_CODEBASE_HASH;
4545
use windmill_common::variables::get_workspace_key;
4646

4747
use crate::triggers::trigger_helpers::ScriptId;
@@ -3526,6 +3526,7 @@ struct Preview {
35263526
tag: Option<String>,
35273527
dedicated_worker: Option<bool>,
35283528
lock: Option<String>,
3529+
format: Option<String>
35293530
}
35303531

35313532
#[derive(Deserialize)]
@@ -5590,6 +5591,7 @@ async fn run_wait_result_preview_script(
55905591
return result;
55915592
}
55925593

5594+
55935595
async fn run_bundle_preview_script(
55945596
authed: ApiAuthed,
55955597
Extension(db): Extension<DB>,
@@ -5598,7 +5600,6 @@ async fn run_bundle_preview_script(
55985600
Query(run_query): Query<RunJobQuery>,
55995601
mut multipart: axum::extract::Multipart,
56005602
) -> error::Result<(StatusCode, String)> {
5601-
use windmill_common::scripts::PREVIEW_IS_TAR_CODEBASE_HASH;
56025603

56035604
if authed.is_operator {
56045605
return Err(error::Error::NotAuthorized(
@@ -5610,13 +5611,15 @@ async fn run_bundle_preview_script(
56105611
let mut tx = None;
56115612
let mut uploaded = false;
56125613
let mut is_tar = false;
5614+
let mut format = BundleFormat::Cjs;
56135615

56145616
while let Some(field) = multipart.next_field().await.unwrap() {
56155617
let name = field.name().unwrap().to_string();
56165618
let data = field.bytes().await;
56175619
let data = data.map_err(to_anyhow)?;
56185620
if name == "preview" {
56195621
let preview: Preview = serde_json::from_slice(&data).map_err(to_anyhow)?;
5622+
format = preview.format.and_then(|s| BundleFormat::from_string(&s)).unwrap_or(BundleFormat::Cjs);
56205623

56215624
let scheduled_for = run_query.get_scheduled_for(&db).await?;
56225625
let tag = run_query.tag.clone().or(preview.tag.clone());
@@ -5637,11 +5640,7 @@ async fn run_bundle_preview_script(
56375640
ltx,
56385641
&w_id,
56395642
JobPayload::Code(RawCode {
5640-
hash: if is_tar {
5641-
Some(PREVIEW_IS_TAR_CODEBASE_HASH)
5642-
} else {
5643-
Some(PREVIEW_IS_CODEBASE_HASH)
5644-
},
5643+
hash: Some(windmill_common::scripts::codebase_to_hash(is_tar, format == BundleFormat::Esm)),
56455644
content: preview.content.unwrap_or_default(),
56465645
path: preview.path,
56475646
language: preview.language.unwrap_or(ScriptLang::Deno),
@@ -5690,52 +5689,17 @@ async fn run_bundle_preview_script(
56905689

56915690
// tracing::info!("is_tar 2: {is_tar}");
56925691

5692+
if format == BundleFormat::Esm {
5693+
id = format!("{}.esm", id);
5694+
}
56935695
if is_tar {
56945696
id = format!("{}.tar", id);
56955697
}
56965698

56975699
uploaded = true;
56985700

5699-
#[cfg(all(feature = "enterprise", feature = "parquet"))]
5700-
let object_store = windmill_common::s3_helpers::get_object_store().await;
5701-
5702-
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
5703-
let object_store: Option<()> = None;
5704-
5705-
if &windmill_common::utils::MODE_AND_ADDONS.mode
5706-
== &windmill_common::utils::Mode::Standalone
5707-
&& object_store.is_none()
5708-
{
5709-
std::fs::create_dir_all(
5710-
windmill_common::worker::ROOT_STANDALONE_BUNDLE_DIR.clone(),
5711-
)?;
5712-
windmill_common::worker::write_file_bytes(
5713-
&windmill_common::worker::ROOT_STANDALONE_BUNDLE_DIR,
5714-
&id,
5715-
&data,
5716-
)?;
5717-
} else {
5718-
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
5719-
{
5720-
return Err(Error::ExecutionErr("codebase is an EE feature".to_string()));
5721-
}
5722-
5723-
#[cfg(all(feature = "enterprise", feature = "parquet"))]
5724-
if let Some(os) = object_store {
5725-
check_license_key_valid().await?;
5726-
5727-
let path = windmill_common::s3_helpers::bundle(&w_id, &id);
5728-
if let Err(e) = os
5729-
.put(&object_store::path::Path::from(path.clone()), data.into())
5730-
.await
5731-
{
5732-
tracing::info!("Failed to put snapshot to s3 at {path}: {:?}", e);
5733-
return Err(Error::ExecutionErr(format!("Failed to put {path} to s3")));
5734-
}
5735-
} else {
5736-
return Err(Error::BadConfig("Object store is required for snapshot script and is not configured for servers".to_string()));
5737-
}
5738-
}
5701+
let path = windmill_common::s3_helpers::bundle(&w_id, &id);
5702+
upload_artifact_to_store(&path, data, &windmill_common::worker::ROOT_STANDALONE_BUNDLE_DIR).await?;
57395703
}
57405704
// println!("Length of `{}` is {} bytes", name, data.len());
57415705
}

backend/windmill-api/src/scripts.rs

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,7 @@ use windmill_audit::ActionKind;
4141
use windmill_worker::process_relative_imports;
4242

4343
use windmill_common::{
44-
assets::{clear_asset_usage, insert_asset_usage, AssetUsageKind, AssetWithAltAccessType},
45-
error::to_anyhow,
46-
scripts::hash_script,
47-
utils::WarnAfterExt,
48-
worker::CLOUD_HOSTED,
44+
assets::{clear_asset_usage, insert_asset_usage, AssetUsageKind, AssetWithAltAccessType}, error::to_anyhow, s3_helpers::upload_artifact_to_store, scripts::hash_script, utils::WarnAfterExt, worker::CLOUD_HOSTED
4945
};
5046

5147
use windmill_common::{
@@ -421,45 +417,8 @@ async fn create_snapshot_script(
421417

422418
uploaded = true;
423419

424-
#[cfg(all(feature = "enterprise", feature = "parquet"))]
425-
let object_store = windmill_common::s3_helpers::get_object_store().await;
426-
427-
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
428-
let object_store: Option<()> = None;
429-
430-
if &windmill_common::utils::MODE_AND_ADDONS.mode
431-
== &windmill_common::utils::Mode::Standalone
432-
&& object_store.is_none()
433-
{
434-
std::fs::create_dir_all(
435-
windmill_common::worker::ROOT_STANDALONE_BUNDLE_DIR.clone(),
436-
)?;
437-
windmill_common::worker::write_file_bytes(
438-
&windmill_common::worker::ROOT_STANDALONE_BUNDLE_DIR,
439-
&hash,
440-
&data,
441-
)?;
442-
} else {
443-
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
444-
{
445-
return Err(Error::ExecutionErr("codebase is an EE feature".to_string()));
446-
}
447-
448-
#[cfg(all(feature = "enterprise", feature = "parquet"))]
449-
if let Some(os) = object_store {
450-
let path = windmill_common::s3_helpers::bundle(&w_id, &hash);
451-
452-
if let Err(e) = os
453-
.put(&object_store::path::Path::from(path.clone()), data.into())
454-
.await
455-
{
456-
tracing::info!("Failed to put snapshot to s3 at {path}: {:?}", e);
457-
return Err(Error::ExecutionErr(format!("Failed to put {path} to s3")));
458-
}
459-
} else {
460-
return Err(Error::BadConfig("Object store is required for snapshot script and is not configured for servers".to_string()));
461-
}
462-
}
420+
let path = windmill_common::s3_helpers::bundle(&w_id, &hash);
421+
upload_artifact_to_store(&path, data, &windmill_common::worker::ROOT_STANDALONE_BUNDLE_DIR).await?;
463422
}
464423
// println!("Length of `{}` is {} bytes", name, data.len());
465424
}
@@ -479,6 +438,7 @@ async fn create_snapshot_script(
479438
return Ok((StatusCode::CREATED, format!("{}", script_hash.unwrap())));
480439
}
481440

441+
482442
async fn list_paths_from_workspace_runnable(
483443
authed: ApiAuthed,
484444
Extension(user_db): Extension<UserDB>,

backend/windmill-common/src/s3_helpers.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,67 @@ pub async fn build_object_store_client(
526526
}
527527
}
528528

529+
530+
#[derive(PartialEq)]
531+
pub enum BundleFormat {
532+
Esm,
533+
Cjs,
534+
}
535+
536+
impl BundleFormat {
537+
pub fn from_string(s: &str) -> Option<Self> {
538+
match s {
539+
"esm" => Some(Self::Esm),
540+
"cjs" => Some(Self::Cjs),
541+
_ => None,
542+
}
543+
}
544+
}
545+
546+
pub async fn upload_artifact_to_store(path: &str, data: bytes::Bytes, standalone_dir: &str) -> error::Result<()> {
547+
#[cfg(all(feature = "enterprise", feature = "parquet"))]
548+
let object_store = crate::s3_helpers::get_object_store().await;
549+
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
550+
let object_store: Option<()> = None;
551+
Ok(if &crate::utils::MODE_AND_ADDONS.mode
552+
== &crate::utils::Mode::Standalone
553+
&& object_store.is_none()
554+
{
555+
let path = format!("{}/{}", standalone_dir, path);
556+
tracing::info!("Writing file to path {path}");
557+
558+
let split_path = path.split("/").collect::<Vec<&str>>();
559+
std::fs::create_dir_all(
560+
split_path[..split_path.len() - 1].join("/"),
561+
)?;
562+
563+
crate::worker::write_file_bytes(
564+
&path,
565+
&data,
566+
)?;
567+
} else {
568+
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
569+
{
570+
return Err(error::Error::ExecutionErr("codebase is an EE feature".to_string()));
571+
}
572+
573+
#[cfg(all(feature = "enterprise", feature = "parquet"))]
574+
if let Some(os) = object_store {
575+
576+
if let Err(e) = os
577+
.put(&object_store::path::Path::from(path), data.into())
578+
.await
579+
{
580+
tracing::info!("Failed to put snapshot to s3 at {path}: {:?}", e);
581+
return Err(error::Error::ExecutionErr(format!("Failed to put {path} to s3")));
582+
}
583+
} else {
584+
return Err(error::Error::BadConfig("Object store is required for snapshot script and is not configured for servers".to_string()));
585+
}
586+
})
587+
}
588+
589+
529590
#[cfg(feature = "parquet")]
530591
pub async fn attempt_fetch_bytes(
531592
client: Arc<dyn ObjectStore>,

backend/windmill-common/src/scripts.rs

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,9 +211,53 @@ impl Display for ScriptKind {
211211
}
212212
}
213213

214-
pub const PREVIEW_IS_CODEBASE_HASH: i64 = -42;
215-
pub const PREVIEW_IS_TAR_CODEBASE_HASH: i64 = -43;
214+
const PREVIEW_IS_CODEBASE_HASH: i64 = -42;
215+
const PREVIEW_IS_TAR_CODEBASE_HASH: i64 = -43;
216+
const PREVIEW_IS_ESM_CODEBASE_HASH: i64 = -44;
217+
const PREVIEW_IS_TAR_ESM_CODEBASE_HASH: i64 = -45;
216218

219+
pub fn is_special_codebase_hash(hash: i64) -> bool {
220+
hash == PREVIEW_IS_CODEBASE_HASH || hash == PREVIEW_IS_TAR_CODEBASE_HASH || hash == PREVIEW_IS_ESM_CODEBASE_HASH || hash == PREVIEW_IS_TAR_ESM_CODEBASE_HASH
221+
}
222+
223+
pub fn codebase_to_hash(is_tar: bool, is_esm: bool) -> i64 {
224+
if is_tar {
225+
if is_esm {
226+
PREVIEW_IS_TAR_ESM_CODEBASE_HASH
227+
} else {
228+
PREVIEW_IS_TAR_CODEBASE_HASH
229+
}
230+
} else {
231+
if is_esm {
232+
PREVIEW_IS_ESM_CODEBASE_HASH
233+
} else {
234+
PREVIEW_IS_CODEBASE_HASH
235+
}
236+
}
237+
}
238+
239+
240+
pub fn hash_to_codebase_id(job_id: &str, hash: i64) -> Option<String> {
241+
match hash {
242+
PREVIEW_IS_CODEBASE_HASH => Some(job_id.to_string()),
243+
PREVIEW_IS_TAR_CODEBASE_HASH => Some(format!("{}.tar", job_id)),
244+
PREVIEW_IS_ESM_CODEBASE_HASH => Some(format!("{}.esm", job_id)),
245+
PREVIEW_IS_TAR_ESM_CODEBASE_HASH => Some(format!("{}.esm.tar", job_id)),
246+
_ => None,
247+
}
248+
}
249+
250+
251+
pub struct CodebaseInfo {
252+
pub is_tar: bool,
253+
pub is_esm: bool,
254+
}
255+
256+
pub fn id_to_codebase_info(id: &str) -> CodebaseInfo {
257+
let is_tar = id.ends_with(".tar");
258+
let is_esm = id.contains(".esm");
259+
CodebaseInfo { is_tar, is_esm }
260+
}
217261
#[derive(Serialize, sqlx::FromRow)]
218262
pub struct Script {
219263
pub workspace_id: String,

backend/windmill-common/src/worker.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ lazy_static::lazy_static! {
258258
// Features flags:
259259
pub static ref DISABLE_FLOW_SCRIPT: bool = std::env::var("DISABLE_FLOW_SCRIPT").ok().is_some_and(|x| x == "1" || x == "true");
260260

261-
pub static ref ROOT_STANDALONE_BUNDLE_DIR: String = format!("{}/.windmill/standalone_bundle/", std::env::var("HOME").unwrap_or_else(|_| "/root".to_string()));
261+
pub static ref ROOT_STANDALONE_BUNDLE_DIR: String = format!("{}/.windmill/standalone_bundle", std::env::var("HOME").unwrap_or_else(|_| "/root".to_string()));
262262
}
263263

264264
pub const ROOT_CACHE_NOMOUNT_DIR: &str = concatcp!(TMP_DIR, "/cache_nomount/");
@@ -470,9 +470,8 @@ pub fn write_file(dir: &str, path: &str, content: &str) -> error::Result<File> {
470470
Ok(file)
471471
}
472472

473-
pub fn write_file_bytes(dir: &str, path: &str, content: &Bytes) -> error::Result<File> {
474-
let path = format!("{}/{}", dir, path);
475-
let mut file = File::create(&path)?;
473+
pub fn write_file_bytes(path: &str, content: &Bytes) -> error::Result<File> {
474+
let mut file = File::create(path)?;
476475
file.write_all(content)?;
477476
file.flush()?;
478477
Ok(file)

0 commit comments

Comments
 (0)