Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/unit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jobs:
test:
name: Run unit tests and generate report
runs-on: ubuntu-latest
timeout-minutes: 40 # better fail-safe than the default 360 in github actions
steps:
- name: Cleanup space
# https://github.com/actions/runner-images/issues/2840#issuecomment-790492173
Expand Down
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
"crates/rpc/utils",
"crates/state",
"crates/storage",
"crates/tasks",
"crates/test-utils",
"crates/util/mmr",
"crates/vtxjmt",
Expand Down Expand Up @@ -53,6 +54,7 @@ express-reth-exex = { path = "crates/reth/exex" }
express-reth-rpc = { path = "crates/reth/rpc" }
express-rpc-utils = { path = "crates/rpc/utils" }
express-storage = { path = "crates/storage" }
express-tasks = { path = "crates/tasks" }
express-zkvm = { path = "crates/prover/zkvm" } # FIXME make these names more consistent?
zkvm-primitives = { path = "crates/prover/primitives" }

Expand All @@ -72,6 +74,7 @@ digest = "0.10"
eyre = "0.6"
format_serde_error = { git = "https://github.com/AlexanderThaller/format_serde_error" }
futures = "0.3"
futures-util = "0.3"
hex = { version = "0.4", features = ["serde"] }
http = "1.0.0"
hyper = "0.14.25"
Expand Down
1 change: 1 addition & 0 deletions crates/btcio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ alpen-express-db = { workspace = true }
alpen-express-primitives = { workspace = true }
alpen-express-rpc-types = { workspace = true }
alpen-express-state = { workspace = true }
express-tasks = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/btcio/src/reader/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use alpen_express_rpc_types::types::L1Status;
use anyhow::bail;
use bitcoin::{Block, BlockHash};
use tokio::sync::{mpsc, RwLock};
use tracing::*;

use alpen_express_rpc_types::types::L1Status;

use super::config::ReaderConfig;
use super::messages::{BlockData, L1Event};
use crate::rpc::traits::L1Client;
Expand Down
3 changes: 2 additions & 1 deletion crates/btcio/src/writer/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{sync::Arc, time::Duration};

use tracing::*;

use alpen_express_db::{
traits::SequencerDatabase,
types::{BlobEntry, BlobL1Status},
};
use tracing::*;

use crate::{
rpc::traits::{L1Client, SeqL1Client},
Expand Down
60 changes: 32 additions & 28 deletions crates/btcio/src/writer/writer_handler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use std::sync::Arc;

use tokio::{
runtime::Runtime,
sync::{
mpsc::{self, Receiver, Sender},
RwLock,
},
use tokio::sync::{
mpsc::{self, Receiver, Sender},
RwLock,
};
use tracing::*;

Expand All @@ -16,6 +13,7 @@ use alpen_express_db::{
use alpen_express_primitives::buf::Buf32;
use alpen_express_rpc_types::L1Status;
use alpen_express_state::da_blob::BlobIntent;
use express_tasks::TaskExecutor;

use super::broadcast::broadcaster_task;
use super::config::WriterConfig;
Expand Down Expand Up @@ -66,40 +64,46 @@ impl<D: SequencerDatabase + Send + Sync + 'static> DaWriter<D> {
}

pub fn start_writer_task<D: SequencerDatabase + Send + Sync + 'static>(
executor: &TaskExecutor,
rpc_client: Arc<impl SeqL1Client + L1Client>,
config: WriterConfig,
db: Arc<D>,
rt: &Runtime,
l1_status: Arc<RwLock<L1Status>>,
) -> anyhow::Result<DaWriter<D>> {
info!("Starting writer control task");

let (signer_tx, signer_rx) = mpsc::channel::<BlobIdx>(10);

let init_state = initialize_writer_state(db.clone())?;
let WriterInitialState {
next_watch_blob_idx,
next_publish_blob_idx,
} = initialize_writer_state(db.clone())?;

// The watcher task watches L1 for txs confirmations and finalizations. Ideally this should be
// taken care of by the reader task. This can be done later.
rt.spawn(watcher_task(
init_state.next_watch_blob_idx,
rpc_client.clone(),
config.clone(),
db.clone(),
));

rt.spawn(broadcaster_task(
init_state.next_publish_blob_idx,
rpc_client.clone(),
db.clone(),
l1_status.clone(),
));

rt.spawn(listen_for_signing_intents(
signer_rx,
rpc_client,
config,
db.clone(),
));
let rpc_client_w = rpc_client.clone();
let config_w = config.clone();
let db_w = db.clone();
executor.spawn_critical_async("btcio::watcher_task", async move {
watcher_task(next_watch_blob_idx, rpc_client_w, config_w, db_w)
.await
.unwrap()
});

let rpc_client_b = rpc_client.clone();
let db_b = db.clone();
executor.spawn_critical_async("btcio::broadcaster_task", async move {
broadcaster_task(next_publish_blob_idx, rpc_client_b, db_b, l1_status.clone())
.await
.unwrap()
});

let db_s = db.clone();
executor.spawn_critical_async("btcio::listen_for_signing_intents", async {
listen_for_signing_intents(signer_rx, rpc_client, config, db_s)
.await
.unwrap()
});

Ok(DaWriter { signer_tx, db })
}
Expand Down
1 change: 1 addition & 0 deletions crates/consensus-logic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ alpen-express-primitives = { workspace = true }
alpen-express-state = { workspace = true }
express-chaintsn = { workspace = true }
express-storage = { workspace = true }
express-tasks = { workspace = true }

anyhow = { workspace = true }
bitcoin = { workspace = true }
Expand Down
32 changes: 25 additions & 7 deletions crates/consensus-logic/src/duty/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time;

use alpen_express_btcio::writer::DaWriter;
use alpen_express_primitives::buf::{Buf32, Buf64};
use alpen_express_state::batch::{BatchCommitment, SignedBatchCommitment};
use alpen_express_state::da_blob::{BlobDest, BlobIntent};
use express_storage::L2BlockManager;
use tokio::sync::broadcast;
use tracing::*;

use alpen_express_btcio::writer::DaWriter;
use alpen_express_db::traits::*;
use alpen_express_eectl::engine::ExecEngineCtl;
use alpen_express_primitives::buf::{Buf32, Buf64};
use alpen_express_primitives::params::Params;
use alpen_express_state::batch::{BatchCommitment, SignedBatchCommitment};
use alpen_express_state::client_state::ClientState;
use alpen_express_state::da_blob::{BlobDest, BlobIntent};
use alpen_express_state::prelude::*;
use express_storage::L2BlockManager;
use express_tasks::{ShutdownGuard, TaskExecutor};

use super::types::{self, Duty, DutyBatch, Identity, IdentityKey};
use super::{block_assembly, extractor};
Expand All @@ -26,6 +27,7 @@ use crate::message::{ClientUpdateNotif, ForkChoiceMessage};
use crate::sync_manager::SyncManager;

pub fn duty_tracker_task<D: Database>(
shutdown: ShutdownGuard,
cupdate_rx: broadcast::Receiver<Arc<ClientUpdateNotif>>,
batch_queue: broadcast::Sender<DutyBatch>,
ident: Identity,
Expand All @@ -35,6 +37,7 @@ pub fn duty_tracker_task<D: Database>(
) {
let db = database.as_ref();
if let Err(e) = duty_tracker_task_inner(
shutdown,
cupdate_rx,
batch_queue,
ident,
Expand All @@ -47,6 +50,7 @@ pub fn duty_tracker_task<D: Database>(
}

fn duty_tracker_task_inner(
shutdown: ShutdownGuard,
mut cupdate_rx: broadcast::Receiver<Arc<ClientUpdateNotif>>,
batch_queue: broadcast::Sender<DutyBatch>,
ident: Identity,
Expand All @@ -65,6 +69,10 @@ fn duty_tracker_task_inner(
duties_tracker.set_finalized_block(last_finalized_blk);

loop {
if shutdown.should_shutdown() {
warn!("received shutdown signal");
break;
}
let update = match cupdate_rx.blocking_recv() {
Ok(u) => u,
Err(broadcast::error::RecvError::Closed) => {
Expand Down Expand Up @@ -182,12 +190,14 @@ struct DutyExecStatus {
result: Result<(), Error>,
}

#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_arguments)] // FIXME
pub fn duty_dispatch_task<
D: Database + Sync + Send + 'static,
E: ExecEngineCtl + Sync + Send + 'static,
S: SequencerDatabase + Sync + Send + 'static,
>(
shutdown: ShutdownGuard,
executor: TaskExecutor,
mut updates: broadcast::Receiver<DutyBatch>,
ident_key: IdentityKey,
sync_man: Arc<SyncManager>,
Expand All @@ -206,7 +216,7 @@ pub fn duty_dispatch_task<
let (duty_status_tx, duty_status_rx) = std::sync::mpsc::channel::<DutyExecStatus>();

let pending_duties_t = pending_duties.clone();
std::thread::spawn(move || loop {
executor.spawn_critical("pending duty tracker", move |shutdown| loop {
if let Ok(DutyExecStatus { id, result }) = duty_status_rx.recv() {
if let Err(e) = result {
error!(err = %e, "error performing duty");
Expand All @@ -216,10 +226,18 @@ pub fn duty_dispatch_task<
if pending_duties_t.write().unwrap().remove(&id).is_none() {
warn!(%id, "tried to remove non-existent duty");
}
if shutdown.should_shutdown() {
warn!("received shutdown signal");
break;
}
}
});

loop {
if shutdown.should_shutdown() {
warn!("received shutdown signal");
break;
}
let update = match updates.blocking_recv() {
Ok(u) => u,
Err(broadcast::error::RecvError::Closed) => {
Expand Down
19 changes: 17 additions & 2 deletions crates/consensus-logic/src/fork_choice_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use alpen_express_state::prelude::*;
use alpen_express_state::state_op::StateCache;
use alpen_express_state::sync_event::SyncEvent;
use express_storage::L2BlockManager;
use express_tasks::ShutdownGuard;

use crate::ctl::CsmController;
use crate::message::ForkChoiceMessage;
Expand Down Expand Up @@ -142,12 +143,18 @@ pub fn init_forkchoice_manager<D: Database>(
/// Recvs inputs from the FCM channel until we receive a signal that we've
/// reached a point where we've done genesis.
fn wait_for_csm_ready(
shutdown: &ShutdownGuard,
fcm_rx: &mut mpsc::Receiver<ForkChoiceMessage>,
) -> anyhow::Result<Arc<ClientState>> {
while let Some(msg) = fcm_rx.blocking_recv() {
if let Some(state) = process_early_fcm_msg(msg) {
return Ok(state);
}

if shutdown.should_shutdown() {
warn!("received shutdown signal");
break;
}
}

warn!("CSM task exited without providing new state");
Expand Down Expand Up @@ -214,6 +221,7 @@ fn determine_start_tip(

/// Main tracker task that takes a ready fork choice manager and some IO stuff.
pub fn tracker_task<D: Database, E: ExecEngineCtl>(
shutdown: ShutdownGuard,
database: Arc<D>,
l2_block_manager: Arc<L2BlockManager>,
engine: Arc<E>,
Expand All @@ -223,7 +231,7 @@ pub fn tracker_task<D: Database, E: ExecEngineCtl>(
) {
// Wait until the CSM gives us a state we can start from.
info!("waiting for CSM ready");
let init_state = match wait_for_csm_ready(&mut fcm_rx) {
let init_state = match wait_for_csm_ready(&shutdown, &mut fcm_rx) {
Ok(s) => s,
Err(e) => {
error!(err = %e, "failed to initialize forkchoice manager");
Expand Down Expand Up @@ -260,18 +268,25 @@ pub fn tracker_task<D: Database, E: ExecEngineCtl>(
}
};

if let Err(e) = forkchoice_manager_task_inner(fcm, engine.as_ref(), fcm_rx, &csm_ctl) {
if let Err(e) = forkchoice_manager_task_inner(&shutdown, fcm, engine.as_ref(), fcm_rx, &csm_ctl)
{
error!(err = %e, "tracker aborted");
}
}

fn forkchoice_manager_task_inner<D: Database, E: ExecEngineCtl>(
shutdown: &ShutdownGuard,
mut state: ForkChoiceManager<D>,
engine: &E,
mut fcm_rx: mpsc::Receiver<ForkChoiceMessage>,
csm_ctl: &CsmController,
) -> anyhow::Result<()> {
loop {
if shutdown.should_shutdown() {
warn!("received shutdown signal");
break;
}

let Some(m) = fcm_rx.blocking_recv() else {
break;
};
Expand Down
Loading