Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@

## [Unreleased](https://github.com/axelarnetwork/axelar-amplifier/tree/HEAD)

[Full Changelog](https://github.com/axelarnetwork/axelar-amplifier/compare/ampd-v1.12.1..HEAD)
[Full Changelog](https://github.com/axelarnetwork/axelar-amplifier/compare/ampd-v1.12.2..HEAD)

## [v1.12.2](https://github.com/axelarnetwork/axelar-amplifier/tree/ampd-v1.12.2) (2025-10-03)

[Full Changelog](https://github.com/axelarnetwork/axelar-amplifier/compare/ampd-v1.12.1..ampd-v1.12.2)

- add cancellation token to confirmer and broadcaster [#1056](https://github.com/axelarnetwork/axelar-amplifier/pull/1056)
- add name to task runs [#1055](https://github.com/axelarnetwork/axelar-amplifier/pull/1055)
- rotate out blastapi rpcs [#1054](https://github.com/axelarnetwork/axelar-amplifier/pull/1054)

## [v1.12.1](https://github.com/axelarnetwork/axelar-amplifier/tree/ampd-v1.12.1) (2025-09-25)

Expand Down
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ strum = { version = "0.25", default-features = false, features = ["derive"] }
sui-gateway = { version = "^1.0.0", path = "packages/sui-gateway" }
sui-types = { version = "^1.0.0", path = "packages/sui-types" }
syn = "2.0.92"
temp-env = "0.3.6"
tendermint = "0.35.0"
thiserror = "1.0.61"
tofn = { version = "1.1" }
Expand Down
3 changes: 2 additions & 1 deletion ampd/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "ampd"
edition = { workspace = true }
version = "1.12.1"
version = "1.12.2"
rust-version = { workspace = true }
license = "MIT OR Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down Expand Up @@ -122,6 +122,7 @@ goldie = { workspace = true }
multisig = { workspace = true, features = ["test", "library"] }
rand = { workspace = true }
random-string = "1.0.0"
temp-env = { workspace = true, features = ["async_closure"] }
tendermint-proto = { version = "0.40.3" }
test-log = { version = "0.2", features = ["trace"], default-features = false }
tokio = { workspace = true, features = ["test-util"] }
Expand Down
210 changes: 144 additions & 66 deletions ampd/src/asyncutil/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::future::Future;
use std::pin::Pin;

use axelar_wasm_std::error::extend_err;
use error_stack::{Context, Result, ResultExt};
use error_stack::{report, Context, Result, ResultExt};
use thiserror::Error;
use tokio::io::Error as IoError;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::{info, warn};

type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
/// This type represents an awaitable action that can be cancelled. It abstracts away the necessary boxing and pinning
Expand All @@ -33,15 +34,15 @@ impl<T> CancellableTask<T> {

pub struct TaskGroup<E>
where
E: From<TaskError> + Context,
E: Context,
{
name: String,
tasks: Vec<CancellableTask<Result<(), E>>>,
tasks: Vec<(String, CancellableTask<Result<(), E>>)>,
}

impl<E> TaskGroup<E>
where
E: From<TaskError> + Context,
E: Context,
{
pub fn new(name: impl Into<String>) -> Self {
TaskGroup {
Expand All @@ -51,72 +52,109 @@ where
}

/// The added tasks won't be started until [Self::run] is called
pub fn add_task(mut self, task: CancellableTask<Result<(), E>>) -> Self {
self.tasks.push(task);
pub fn add_task(
mut self,
task_name: impl Into<String>,
task: CancellableTask<Result<(), E>>,
) -> Self {
self.tasks.push((task_name.into(), task));
self
}

/// Runs all tasks concurrently. If one task fails, all others are cancelled and the collection of errors is returned.
/// If a task panics, it still returns an error to the manager, so the parent process can shut down all tasks gracefully.
pub async fn run(self, token: CancellationToken) -> Result<(), E> {
pub async fn run(self, token: CancellationToken) -> Result<(), TaskGroupError> {
// running tasks and waiting for them is tightly coupled, so they both share a cloned token
let mut running_tasks = start_tasks(self.tasks, token.clone());
wait_for_completion(self.name, &mut running_tasks, &token).await
}
}

fn start_tasks<T>(tasks: Vec<CancellableTask<T>>, token: CancellationToken) -> JoinSet<T>
fn start_tasks<T>(
tasks: Vec<(String, CancellableTask<T>)>,
token: CancellationToken,
) -> JoinSet<(String, T)>
where
T: Send + 'static,
{
let mut join_set = JoinSet::new();

for task in tasks.into_iter() {
for (task_name, task) in tasks.into_iter() {
// tasks clean up on their own after the cancellation token is triggered, so we discard the abort handles.
// However, we don't know what tasks will do with their token, so we need to create new child tokens here,
// so each task can act independently
join_set.spawn(task.run(token.child_token()));
let child_token = token.child_token();
join_set.spawn(async move {
let result = task.run(child_token).await;
(task_name, result)
});
}
join_set
}

async fn wait_for_completion<E>(
group_name: String,
running_tasks: &mut JoinSet<Result<(), E>>,
running_tasks: &mut JoinSet<(String, Result<(), E>)>,
token: &CancellationToken,
) -> Result<(), E>
) -> Result<(), TaskGroupError>
where
E: From<TaskError> + Context,
E: Context,
{
let mut final_result = Ok(());
let total_task_count = running_tasks.len();
while let Some(task_result) = running_tasks.join_next().await {
// if one task stops, all others should stop as well, so we cancel the token.
// Any call to this after the first is a no-op, so no need to guard it.
token.cancel();
info!(
"shutting down {} sub-tasks ({}/{})",
group_name,
total_task_count.saturating_sub(running_tasks.len()),
total_task_count
);

final_result = match task_result.change_context(E::from(TaskError {})) {
Err(err) | Ok(Err(err)) => extend_err(final_result, err),
Ok(_) => final_result,
};

match task_result {
Ok((task_name, task_result)) => {
info!(
"shutting down {} sub-tasks ({}/{}) - task '{}' completed",
group_name,
total_task_count.saturating_sub(running_tasks.len()),
total_task_count,
task_name
);

final_result = match task_result {
Err(err) => extend_err(final_result, err.change_context(TaskError {})),
Ok(()) => final_result,
};
}
Err(join_error) => {
warn!(
"shutting down {} sub-tasks ({}/{}) - task aborted or panicked: {}",
group_name,
total_task_count.saturating_sub(running_tasks.len()),
total_task_count,
join_error
);

final_result = extend_err(
final_result,
report!(IoError::from(join_error)).change_context(TaskError {}),
);
}
}
}

final_result
final_result.change_context(TaskGroupError {})
}

#[derive(Error, Debug)]
#[error("task failed")]
pub struct TaskError;

#[derive(Error, Debug)]
#[error("task group execution failed")]
pub struct TaskGroupError;

#[cfg(test)]
mod test {
use error_stack::report;
use error_stack::fmt::ColorMode;
use error_stack::{report, Report};
use temp_env::async_with_vars;
use tokio_util::sync::CancellationToken;

use crate::asyncutil::task::{CancellableTask, TaskError, TaskGroup};
Expand All @@ -135,53 +173,93 @@ mod test {
};

let tasks: TaskGroup<TaskError> = TaskGroup::new("test")
.add_task(CancellableTask::create(waiting_task))
.add_task(CancellableTask::create(waiting_task))
.add_task(CancellableTask::create(|_| async { Ok(()) }))
.add_task(CancellableTask::create(waiting_task));
.add_task("waiting_task_1", CancellableTask::create(waiting_task))
.add_task("waiting_task_2", CancellableTask::create(waiting_task))
.add_task(
"immediate_task",
CancellableTask::create(|_| async { Ok(()) }),
)
.add_task("waiting_task_3", CancellableTask::create(waiting_task));
assert!(tasks.run(CancellationToken::new()).await.is_ok());
}

#[tokio::test]
async fn collect_all_errors_on_completion() {
let tasks = TaskGroup::new("test")
.add_task(CancellableTask::create(|token| async move {
token.cancelled().await;
Err(report!(TaskError {}))
}))
.add_task(CancellableTask::create(|token| async move {
token.cancelled().await;
Err(report!(TaskError {}))
}))
.add_task(CancellableTask::create(|_| async { Ok(()) }))
.add_task(CancellableTask::create(|token| async move {
token.cancelled().await;
Err(report!(TaskError {}))
}))
.add_task(CancellableTask::create(|_| async { Ok(()) }))
.add_task(CancellableTask::create(|token| async move {
token.cancelled().await;
Err(report!(TaskError {}))
}));
let result = tasks.run(CancellationToken::new()).await;
let err = result.unwrap_err();
assert_eq!(err.current_frames().len(), 4);
async_with_vars([("RUST_BACKTRACE", Some("0"))], async {
let tasks = TaskGroup::new("test")
.add_task(
"error_task_1",
CancellableTask::create(|token| async move {
token.cancelled().await;
Err(report!(TaskError {}))
}),
)
.add_task(
"error_task_2",
CancellableTask::create(|token| async move {
token.cancelled().await;
Err(report!(TaskError {}))
}),
)
.add_task(
"success_task_1",
CancellableTask::create(|_| async { Ok(()) }),
)
.add_task(
"error_task_3",
CancellableTask::create(|token| async move {
token.cancelled().await;
Err(report!(TaskError {}))
}),
)
.add_task(
"success_task_2",
CancellableTask::create(|_| async { Ok(()) }),
)
.add_task(
"error_task_4",
CancellableTask::create(|token| async move {
token.cancelled().await;
Err(report!(TaskError {}))
}),
);
let result = tasks.run(CancellationToken::new()).await;
let err = result.unwrap_err();
Report::set_color_mode(ColorMode::None);
goldie::assert_debug!(err);
})
.await;
}

#[tokio::test]
async fn shutdown_gracefully_on_task_panic() {
let tasks = TaskGroup::new("test")
.add_task(CancellableTask::create(|_| async { Ok(()) }))
.add_task(CancellableTask::create(|_| async { panic!("panic") }))
.add_task(CancellableTask::create(|_| async {
Err(report!(TaskError {}))
}))
.add_task(CancellableTask::create(|_| async { Ok(()) }))
.add_task(CancellableTask::create(|_| async {
Err(report!(TaskError {}))
}));
let result = tasks.run(CancellationToken::new()).await;
let err = result.unwrap_err();
assert_eq!(err.current_frames().len(), 3);
async_with_vars([("RUST_BACKTRACE", Some("0"))], async {
let tasks = TaskGroup::new("test")
.add_task(
"success_task_1",
CancellableTask::create(|_| async { Ok(()) }),
)
.add_task(
"panic_task",
CancellableTask::create(|_| async { panic!("panic") }),
)
.add_task(
"error_task",
CancellableTask::create(|_| async { Err(report!(TaskError {})) }),
)
.add_task(
"success_task_2",
CancellableTask::create(|_| async { Ok(()) }),
)
.add_task(
"error_task_2",
CancellableTask::create(|_| async { Err(report!(TaskError {})) }),
);
let result = tasks.run(CancellationToken::new()).await;
let err = result.unwrap_err();
Report::set_color_mode(ColorMode::None);
goldie::assert_debug!(err);
})
.await;
}
}
Loading
Loading