Skip to content

Commit 16d011f

Browse files
authored
Merge pull request #233 from alpenlabs/EXP-27-watch-core-tasks
[EXP-27] Watch tasks and handle exit
2 parents 194e274 + f617586 commit 16d011f

File tree

24 files changed

+799
-141
lines changed

24 files changed

+799
-141
lines changed

.github/workflows/unit.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ jobs:
1919
test:
2020
name: Run unit tests and generate report
2121
runs-on: ubuntu-latest
22+
timeout-minutes: 40 # better fail-safe than the default 360 in github actions
2223
steps:
2324
- name: Cleanup space
2425
# https://github.com/actions/runner-images/issues/2840#issuecomment-790492173

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ members = [
2020
"crates/rpc/utils",
2121
"crates/state",
2222
"crates/storage",
23+
"crates/tasks",
2324
"crates/test-utils",
2425
"crates/util/mmr",
2526
"crates/vtxjmt",
@@ -53,6 +54,7 @@ express-reth-exex = { path = "crates/reth/exex" }
5354
express-reth-rpc = { path = "crates/reth/rpc" }
5455
express-rpc-utils = { path = "crates/rpc/utils" }
5556
express-storage = { path = "crates/storage" }
57+
express-tasks = { path = "crates/tasks" }
5658
express-zkvm = { path = "crates/prover/zkvm" } # FIXME make these names more consistent?
5759
zkvm-primitives = { path = "crates/prover/primitives" }
5860

@@ -72,6 +74,7 @@ digest = "0.10"
7274
eyre = "0.6"
7375
format_serde_error = { git = "https://github.com/AlexanderThaller/format_serde_error" }
7476
futures = "0.3"
77+
futures-util = "0.3"
7578
hex = { version = "0.4", features = ["serde"] }
7679
http = "1.0.0"
7780
hyper = "0.14.25"

crates/btcio/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ alpen-express-db = { workspace = true }
88
alpen-express-primitives = { workspace = true }
99
alpen-express-rpc-types = { workspace = true }
1010
alpen-express-state = { workspace = true }
11+
express-tasks = { workspace = true }
1112

1213
anyhow = { workspace = true }
1314
async-trait = { workspace = true }

crates/btcio/src/reader/query.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ use std::collections::VecDeque;
22
use std::sync::Arc;
33
use std::time::{Duration, SystemTime, UNIX_EPOCH};
44

5-
use alpen_express_rpc_types::types::L1Status;
65
use anyhow::bail;
76
use bitcoin::{Block, BlockHash};
87
use tokio::sync::{mpsc, RwLock};
98
use tracing::*;
109

10+
use alpen_express_rpc_types::types::L1Status;
11+
1112
use super::config::ReaderConfig;
1213
use super::messages::{BlockData, L1Event};
1314
use crate::rpc::traits::L1Client;

crates/btcio/src/writer/watcher.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use std::{sync::Arc, time::Duration};
22

3+
use tracing::*;
4+
35
use alpen_express_db::{
46
traits::SequencerDatabase,
57
types::{BlobEntry, BlobL1Status},
68
};
7-
use tracing::*;
89

910
use crate::{
1011
rpc::traits::{L1Client, SeqL1Client},

crates/btcio/src/writer/writer_handler.rs

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
use std::sync::Arc;
22

3-
use tokio::{
4-
runtime::Runtime,
5-
sync::{
6-
mpsc::{self, Receiver, Sender},
7-
RwLock,
8-
},
3+
use tokio::sync::{
4+
mpsc::{self, Receiver, Sender},
5+
RwLock,
96
};
107
use tracing::*;
118

@@ -16,6 +13,7 @@ use alpen_express_db::{
1613
use alpen_express_primitives::buf::Buf32;
1714
use alpen_express_rpc_types::L1Status;
1815
use alpen_express_state::da_blob::BlobIntent;
16+
use express_tasks::TaskExecutor;
1917

2018
use super::broadcast::broadcaster_task;
2119
use super::config::WriterConfig;
@@ -66,40 +64,46 @@ impl<D: SequencerDatabase + Send + Sync + 'static> DaWriter<D> {
6664
}
6765

6866
pub fn start_writer_task<D: SequencerDatabase + Send + Sync + 'static>(
67+
executor: &TaskExecutor,
6968
rpc_client: Arc<impl SeqL1Client + L1Client>,
7069
config: WriterConfig,
7170
db: Arc<D>,
72-
rt: &Runtime,
7371
l1_status: Arc<RwLock<L1Status>>,
7472
) -> anyhow::Result<DaWriter<D>> {
7573
info!("Starting writer control task");
7674

7775
let (signer_tx, signer_rx) = mpsc::channel::<BlobIdx>(10);
7876

79-
let init_state = initialize_writer_state(db.clone())?;
77+
let WriterInitialState {
78+
next_watch_blob_idx,
79+
next_publish_blob_idx,
80+
} = initialize_writer_state(db.clone())?;
8081

8182
// The watcher task watches L1 for txs confirmations and finalizations. Ideally this should be
8283
// taken care of by the reader task. This can be done later.
83-
rt.spawn(watcher_task(
84-
init_state.next_watch_blob_idx,
85-
rpc_client.clone(),
86-
config.clone(),
87-
db.clone(),
88-
));
89-
90-
rt.spawn(broadcaster_task(
91-
init_state.next_publish_blob_idx,
92-
rpc_client.clone(),
93-
db.clone(),
94-
l1_status.clone(),
95-
));
96-
97-
rt.spawn(listen_for_signing_intents(
98-
signer_rx,
99-
rpc_client,
100-
config,
101-
db.clone(),
102-
));
84+
let rpc_client_w = rpc_client.clone();
85+
let config_w = config.clone();
86+
let db_w = db.clone();
87+
executor.spawn_critical_async("btcio::watcher_task", async move {
88+
watcher_task(next_watch_blob_idx, rpc_client_w, config_w, db_w)
89+
.await
90+
.unwrap()
91+
});
92+
93+
let rpc_client_b = rpc_client.clone();
94+
let db_b = db.clone();
95+
executor.spawn_critical_async("btcio::broadcaster_task", async move {
96+
broadcaster_task(next_publish_blob_idx, rpc_client_b, db_b, l1_status.clone())
97+
.await
98+
.unwrap()
99+
});
100+
101+
let db_s = db.clone();
102+
executor.spawn_critical_async("btcio::listen_for_signing_intents", async {
103+
listen_for_signing_intents(signer_rx, rpc_client, config, db_s)
104+
.await
105+
.unwrap()
106+
});
103107

104108
Ok(DaWriter { signer_tx, db })
105109
}

crates/consensus-logic/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ alpen-express-primitives = { workspace = true }
1111
alpen-express-state = { workspace = true }
1212
express-chaintsn = { workspace = true }
1313
express-storage = { workspace = true }
14+
express-tasks = { workspace = true }
1415

1516
anyhow = { workspace = true }
1617
bitcoin = { workspace = true }

crates/consensus-logic/src/duty/worker.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,20 @@ use std::collections::HashMap;
44
use std::sync::{Arc, RwLock};
55
use std::time;
66

7-
use alpen_express_btcio::writer::DaWriter;
8-
use alpen_express_primitives::buf::{Buf32, Buf64};
9-
use alpen_express_state::batch::{BatchCommitment, SignedBatchCommitment};
10-
use alpen_express_state::da_blob::{BlobDest, BlobIntent};
11-
use express_storage::L2BlockManager;
127
use tokio::sync::broadcast;
138
use tracing::*;
149

10+
use alpen_express_btcio::writer::DaWriter;
1511
use alpen_express_db::traits::*;
1612
use alpen_express_eectl::engine::ExecEngineCtl;
13+
use alpen_express_primitives::buf::{Buf32, Buf64};
1714
use alpen_express_primitives::params::Params;
15+
use alpen_express_state::batch::{BatchCommitment, SignedBatchCommitment};
1816
use alpen_express_state::client_state::ClientState;
17+
use alpen_express_state::da_blob::{BlobDest, BlobIntent};
1918
use alpen_express_state::prelude::*;
19+
use express_storage::L2BlockManager;
20+
use express_tasks::{ShutdownGuard, TaskExecutor};
2021

2122
use super::types::{self, Duty, DutyBatch, Identity, IdentityKey};
2223
use super::{block_assembly, extractor};
@@ -26,6 +27,7 @@ use crate::message::{ClientUpdateNotif, ForkChoiceMessage};
2627
use crate::sync_manager::SyncManager;
2728

2829
pub fn duty_tracker_task<D: Database>(
30+
shutdown: ShutdownGuard,
2931
cupdate_rx: broadcast::Receiver<Arc<ClientUpdateNotif>>,
3032
batch_queue: broadcast::Sender<DutyBatch>,
3133
ident: Identity,
@@ -35,6 +37,7 @@ pub fn duty_tracker_task<D: Database>(
3537
) {
3638
let db = database.as_ref();
3739
if let Err(e) = duty_tracker_task_inner(
40+
shutdown,
3841
cupdate_rx,
3942
batch_queue,
4043
ident,
@@ -47,6 +50,7 @@ pub fn duty_tracker_task<D: Database>(
4750
}
4851

4952
fn duty_tracker_task_inner(
53+
shutdown: ShutdownGuard,
5054
mut cupdate_rx: broadcast::Receiver<Arc<ClientUpdateNotif>>,
5155
batch_queue: broadcast::Sender<DutyBatch>,
5256
ident: Identity,
@@ -65,6 +69,10 @@ fn duty_tracker_task_inner(
6569
duties_tracker.set_finalized_block(last_finalized_blk);
6670

6771
loop {
72+
if shutdown.should_shutdown() {
73+
warn!("received shutdown signal");
74+
break;
75+
}
6876
let update = match cupdate_rx.blocking_recv() {
6977
Ok(u) => u,
7078
Err(broadcast::error::RecvError::Closed) => {
@@ -182,12 +190,14 @@ struct DutyExecStatus {
182190
result: Result<(), Error>,
183191
}
184192

185-
#[allow(clippy::too_many_arguments)]
193+
#[allow(clippy::too_many_arguments)] // FIXME
186194
pub fn duty_dispatch_task<
187195
D: Database + Sync + Send + 'static,
188196
E: ExecEngineCtl + Sync + Send + 'static,
189197
S: SequencerDatabase + Sync + Send + 'static,
190198
>(
199+
shutdown: ShutdownGuard,
200+
executor: TaskExecutor,
191201
mut updates: broadcast::Receiver<DutyBatch>,
192202
ident_key: IdentityKey,
193203
sync_man: Arc<SyncManager>,
@@ -206,7 +216,7 @@ pub fn duty_dispatch_task<
206216
let (duty_status_tx, duty_status_rx) = std::sync::mpsc::channel::<DutyExecStatus>();
207217

208218
let pending_duties_t = pending_duties.clone();
209-
std::thread::spawn(move || loop {
219+
executor.spawn_critical("pending duty tracker", move |shutdown| loop {
210220
if let Ok(DutyExecStatus { id, result }) = duty_status_rx.recv() {
211221
if let Err(e) = result {
212222
error!(err = %e, "error performing duty");
@@ -216,10 +226,18 @@ pub fn duty_dispatch_task<
216226
if pending_duties_t.write().unwrap().remove(&id).is_none() {
217227
warn!(%id, "tried to remove non-existent duty");
218228
}
229+
if shutdown.should_shutdown() {
230+
warn!("received shutdown signal");
231+
break;
232+
}
219233
}
220234
});
221235

222236
loop {
237+
if shutdown.should_shutdown() {
238+
warn!("received shutdown signal");
239+
break;
240+
}
223241
let update = match updates.blocking_recv() {
224242
Ok(u) => u,
225243
Err(broadcast::error::RecvError::Closed) => {

crates/consensus-logic/src/fork_choice_manager.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use alpen_express_state::prelude::*;
1717
use alpen_express_state::state_op::StateCache;
1818
use alpen_express_state::sync_event::SyncEvent;
1919
use express_storage::L2BlockManager;
20+
use express_tasks::ShutdownGuard;
2021

2122
use crate::ctl::CsmController;
2223
use crate::message::ForkChoiceMessage;
@@ -142,12 +143,18 @@ pub fn init_forkchoice_manager<D: Database>(
142143
/// Recvs inputs from the FCM channel until we receive a signal that we've
143144
/// reached a point where we've done genesis.
144145
fn wait_for_csm_ready(
146+
shutdown: &ShutdownGuard,
145147
fcm_rx: &mut mpsc::Receiver<ForkChoiceMessage>,
146148
) -> anyhow::Result<Arc<ClientState>> {
147149
while let Some(msg) = fcm_rx.blocking_recv() {
148150
if let Some(state) = process_early_fcm_msg(msg) {
149151
return Ok(state);
150152
}
153+
154+
if shutdown.should_shutdown() {
155+
warn!("received shutdown signal");
156+
break;
157+
}
151158
}
152159

153160
warn!("CSM task exited without providing new state");
@@ -214,6 +221,7 @@ fn determine_start_tip(
214221

215222
/// Main tracker task that takes a ready fork choice manager and some IO stuff.
216223
pub fn tracker_task<D: Database, E: ExecEngineCtl>(
224+
shutdown: ShutdownGuard,
217225
database: Arc<D>,
218226
l2_block_manager: Arc<L2BlockManager>,
219227
engine: Arc<E>,
@@ -223,7 +231,7 @@ pub fn tracker_task<D: Database, E: ExecEngineCtl>(
223231
) {
224232
// Wait until the CSM gives us a state we can start from.
225233
info!("waiting for CSM ready");
226-
let init_state = match wait_for_csm_ready(&mut fcm_rx) {
234+
let init_state = match wait_for_csm_ready(&shutdown, &mut fcm_rx) {
227235
Ok(s) => s,
228236
Err(e) => {
229237
error!(err = %e, "failed to initialize forkchoice manager");
@@ -260,18 +268,25 @@ pub fn tracker_task<D: Database, E: ExecEngineCtl>(
260268
}
261269
};
262270

263-
if let Err(e) = forkchoice_manager_task_inner(fcm, engine.as_ref(), fcm_rx, &csm_ctl) {
271+
if let Err(e) = forkchoice_manager_task_inner(&shutdown, fcm, engine.as_ref(), fcm_rx, &csm_ctl)
272+
{
264273
error!(err = %e, "tracker aborted");
265274
}
266275
}
267276

268277
fn forkchoice_manager_task_inner<D: Database, E: ExecEngineCtl>(
278+
shutdown: &ShutdownGuard,
269279
mut state: ForkChoiceManager<D>,
270280
engine: &E,
271281
mut fcm_rx: mpsc::Receiver<ForkChoiceMessage>,
272282
csm_ctl: &CsmController,
273283
) -> anyhow::Result<()> {
274284
loop {
285+
if shutdown.should_shutdown() {
286+
warn!("received shutdown signal");
287+
break;
288+
}
289+
275290
let Some(m) = fcm_rx.blocking_recv() else {
276291
break;
277292
};

0 commit comments

Comments
 (0)