Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 83 additions & 102 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ pub struct Config {
pub data_dir: PathBuf,
pub sse_capacity_multiplier: usize,
pub enable_beacon_processor: bool,
#[serde(with = "eth2::types::serde_status_code")]
pub duplicate_block_status_code: StatusCode,
}

impl Default for Config {
Expand All @@ -152,6 +154,7 @@ impl Default for Config {
data_dir: PathBuf::from(DEFAULT_ROOT_DIR),
sse_capacity_multiplier: 1,
enable_beacon_processor: true,
duplicate_block_status_code: StatusCode::ACCEPTED,
}
}
}
Expand Down Expand Up @@ -508,6 +511,8 @@ pub fn serve<T: BeaconChainTypes>(
let task_spawner_filter =
warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone()));

let duplicate_block_status_code = ctx.config.duplicate_block_status_code;

/*
*
* Start of HTTP method definitions.
Expand Down Expand Up @@ -1282,11 +1287,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block: Arc<SignedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block: Arc<SignedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
Expand All @@ -1295,9 +1300,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1312,11 +1317,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block =
SignedBeaconBlock::<T::EthSpec>::from_ssz_bytes(&block_bytes, &chain.spec)
Expand All @@ -1332,9 +1337,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1350,12 +1355,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |validation_level: api_types::BroadcastValidationQuery,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
Expand All @@ -1364,9 +1369,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1382,12 +1387,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block =
SignedBeaconBlock::<T::EthSpec>::from_ssz_bytes(&block_bytes, &chain.spec)
Expand All @@ -1403,9 +1408,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1425,21 +1430,21 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block: SignedBeaconBlock<T::EthSpec, BlindedPayload<_>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block: SignedBlindedBeaconBlock<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_blinded_block(
block,
chain,
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1455,13 +1460,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block = SignedBeaconBlock::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
let block = SignedBlindedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
)
Expand All @@ -1474,9 +1479,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1492,87 +1497,63 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block: SignedBeaconBlock<T::EthSpec, BlindedPayload<_>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async(Priority::P0, async move {
match publish_blocks::publish_blinded_block(
move |validation_level: api_types::BroadcastValidationQuery,
block: SignedBlindedBeaconBlock<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_blinded_block(
block,
chain,
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
{
Ok(()) => warp::reply().into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
})
},
);

let post_beacon_blinded_blocks_v2_ssz =
eth_v2
.and(warp::path("beacon"))
.and(warp::path("blinded_blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::bytes())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move {
let block =
match SignedBeaconBlock::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
&block_bytes,
&chain.spec,
) {
Ok(data) => data,
Err(_) => {
return warp::reply::with_status(
StatusCode::BAD_REQUEST,
eth2::StatusCode::BAD_REQUEST,
)
.into_response();
}
};
match publish_blocks::publish_blinded_block(
let post_beacon_blinded_blocks_v2_ssz = eth_v2
.and(warp::path("beacon"))
.and(warp::path("blinded_blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::bytes())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block = SignedBlindedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
publish_blocks::publish_blinded_block(
block,
chain,
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
{
Ok(()) => warp::reply().into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
},
);
})
},
);

let block_id_or_err = warp::path::param::<BlockId>().or_else(|_| async {
Err(warp_utils::reject::custom_bad_request(
Expand Down
Loading