Skip to content

Commit 5d6da77

Browse files
committed
Ack scheduled task
1 parent d6e4be6 commit 5d6da77

File tree

3 files changed

+12
-7
lines changed

3 files changed

+12
-7
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "openworkers-runner"
3-
version = "0.1.8-multi"
3+
version = "0.1.9"
44
edition = "2021"
55
default-run = "openworkers-runner"
66

@@ -14,7 +14,7 @@ tokio = "1.43.0"
1414
env_logger = "0.11.6"
1515
http_v02 = { package = "http", version = "0.2.12" }
1616
sqlx = { version = "0.8.3", features = [ "runtime-tokio", "postgres", "uuid", "bigdecimal", "rust_decimal" ] }
17-
openworkers-runtime ={ git = "https://github.com/openworkers/openworkers-runtime", tag = "v0.1.8"}
17+
openworkers-runtime = { git = "https://github.com/openworkers/openworkers-runtime", tag = "v0.1.8"}
1818
# openworkers-runtime = { path = "../openworkers-runtime" }
1919
nats = "0.25.0"
2020
serde_json = "1.0.135"

src/event_scheduled.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct ScheduledData {
1919
pub worker_id: String,
2020
}
2121

22-
fn run_scheduled(data: ScheduledData, script: Script) {
22+
fn run_scheduled(data: ScheduledData, script: Script, nc: nats::Connection) {
2323
let (res_tx, res_rx) = tokio::sync::oneshot::channel::<()>();
2424

2525
let task = Task::Scheduled(Some(ScheduledInit::new(res_tx, data.scheduled_time)));
@@ -47,12 +47,17 @@ fn run_scheduled(data: ScheduledData, script: Script) {
4747

4848
log::debug!("scheduled task listener started");
4949

50-
match local.block_on(&rt, async { res_rx.await } ) {
50+
match local.block_on(&rt, async { res_rx.await }) {
5151
Ok(()) => {}
5252
Err(err) => log::error!("failed to wait for end: {err}"),
5353
}
5454

5555
log::debug!("scheduled task listener stopped");
56+
57+
match nc.publish(&format!("scheduled.ack.{}", data.id), b"") {
58+
Ok(()) => {}
59+
Err(err) => log::error!("failed to ack: {err}"),
60+
}
5661
});
5762
}
5863

@@ -108,11 +113,11 @@ pub fn handle_scheduled(db: sqlx::Pool<sqlx::Postgres>) {
108113
code: crate::transform::parse_worker_code(&worker),
109114
env: match worker.env {
110115
Some(env) => Some(env.deref().to_owned()),
111-
None => None
116+
None => None,
112117
},
113118
};
114119

115-
run_scheduled(data, script);
120+
run_scheduled(data, script, nc.clone());
116121
}
117122

118123
log::debug!("scheduled task listener stopped");

0 commit comments

Comments
 (0)