Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Checks if the block root is currenlty in the availability cache awaiting processing because
/// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components.
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.availability_cache.has_block(block_root)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use crate::{
metrics,
network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor},
service::NetworkMessage,
sync::SyncMessage,
sync::{
manager::{BlockProcessSource, BlockProcessType},
SyncMessage,
},
};
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
Expand Down Expand Up @@ -1187,19 +1190,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"block_root" => %block_root,
);
}
Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`
Err(BlockError::ParentUnknown(_)) => {
// This should not occur. It should be checked by `should_forward_block`.
// Do not send sync message UnknownParentBlock to prevent conflicts with the
// BlockComponentProcessed message below. If this error ever happens, lookup sync
// can recover by receiving another block / blob / attestation referencing the
// chain that includes this block.
error!(
self.log,
"Block with unknown parent attempted to be processed";
"block_root" => %block_root,
"peer_id" => %peer_id
);
self.send_sync_message(SyncMessage::UnknownParentBlock(
peer_id,
block.clone(),
block_root,
));
}
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!(
Expand Down Expand Up @@ -1263,6 +1265,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&self.log,
);
}

self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type: BlockProcessType::SingleBlock,
source: BlockProcessSource::Gossip(block_root),
result: result.into(),
});
}

pub fn process_gossip_voluntary_exit(
Expand Down
17 changes: 7 additions & 10 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
service::NetworkMessage,
sync::{manager::BlockProcessType, SyncMessage},
sync::{manager::BlockProcessSource, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
Expand Down Expand Up @@ -407,13 +407,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) -> Result<(), Error<T::EthSpec>> {
let process_fn = self.clone().generate_rpc_beacon_block_process_fn(
block_root,
block,
seen_timestamp,
process_type,
source,
);
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
Expand All @@ -428,18 +428,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) -> Result<(), Error<T::EthSpec>> {
let blob_count = blobs.iter().filter(|b| b.is_some()).count();
if blob_count == 0 {
return Ok(());
}
let process_fn = self.clone().generate_rpc_blobs_process_fn(
block_root,
blobs,
seen_timestamp,
process_type,
);
let process_fn =
self.clone()
.generate_rpc_blobs_process_fn(block_root, blobs, seen_timestamp, source);
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcBlobs { process_fn },
Expand Down
50 changes: 26 additions & 24 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::metrics;
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
use crate::sync::manager::BlockProcessSource;
use crate::sync::BatchProcessResult;
use crate::sync::{
manager::{BlockProcessType, SyncMessage},
Expand Down Expand Up @@ -53,7 +54,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) -> AsyncFn {
let process_fn = async move {
let reprocess_tx = self.reprocess_tx.clone();
Expand All @@ -62,7 +63,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root,
block,
seen_timestamp,
process_type,
source,
reprocess_tx,
duplicate_cache,
)
Expand All @@ -77,20 +78,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) -> (AsyncFn, BlockingFn) {
// An async closure which will import the block.
let process_fn = self.clone().generate_rpc_beacon_block_process_fn(
block_root,
block,
seen_timestamp,
process_type.clone(),
source.clone(),
);
// A closure which will ignore the block.
let ignore_fn = move || {
// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
process_type: BlockProcessType::SingleBlock,
source,
result: crate::sync::manager::BlockProcessingResult::Ignored,
});
};
Expand All @@ -104,7 +106,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
duplicate_cache: DuplicateCache,
) {
Expand All @@ -115,15 +117,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Gossip block is being processed";
"action" => "sending rpc block to reprocessing queue",
"block_root" => %block_root,
"process_type" => ?process_type,
"source" => ?source,
);

// Send message to work reprocess queue to retry the block
let (process_fn, ignore_fn) = self.clone().generate_rpc_beacon_block_fns(
block_root,
block,
seen_timestamp,
process_type,
source,
);
let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock {
beacon_block_root: block_root,
Expand All @@ -148,7 +150,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"proposer" => block.message().proposer_index(),
"slot" => block.slot(),
"commitments" => commitments_formatted,
"process_type" => ?process_type,
"source" => ?source,
);

let result = self
Expand All @@ -170,21 +172,20 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
};
if matches!(process_type, BlockProcessType::SingleBlock { .. }) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing this match?

It it redundant because we now only process single blocks here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, blobs are processed in process_rpc_blobs

self.chain.block_times_cache.write().set_time_observed(
hash,
slot,
seen_timestamp,
None,
None,
);
self.chain.block_times_cache.write().set_time_observed(
hash,
slot,
seen_timestamp,
None,
None,
);

self.chain.recompute_head_at_current_slot().await;
}
self.chain.recompute_head_at_current_slot().await;
}
// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
process_type: BlockProcessType::SingleBlock,
source,
result: result.into(),
});

Expand All @@ -201,11 +202,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) -> AsyncFn {
let process_fn = async move {
self.clone()
.process_rpc_blobs(block_root, blobs, seen_timestamp, process_type)
.process_rpc_blobs(block_root, blobs, seen_timestamp, source)
.await;
};
Box::pin(process_fn)
Expand All @@ -217,7 +218,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) {
let Some(slot) = blobs
.iter()
Expand Down Expand Up @@ -298,7 +299,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
process_type: BlockProcessType::SingleBlob,
source,
result: result.into(),
});
}
Expand Down
21 changes: 8 additions & 13 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::manager::{Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub trait RequestState<T: BeaconChainTypes> {
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError>;
) -> Result<LookupRequestResult, LookupRequestError>;

/* Response handling methods */

Expand Down Expand Up @@ -80,7 +80,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
peer_id: PeerId,
_: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
.map_err(LookupRequestError::SendFailed)
}
Expand All @@ -97,10 +97,10 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
peer_id: _,
} = download_result;
cx.send_block_for_processing(
id,
block_root,
RpcBlock::new_without_blobs(Some(block_root), value),
seen_timestamp,
BlockProcessType::SingleBlock { id },
)
.map_err(LookupRequestError::SendFailed)
}
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(
id,
peer_id,
Expand All @@ -149,13 +149,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
seen_timestamp,
peer_id: _,
} = download_result;
cx.send_blobs_for_processing(
block_root,
value,
seen_timestamp,
BlockProcessType::SingleBlob { id },
)
.map_err(LookupRequestError::SendFailed)
cx.send_blobs_for_processing(id, block_root, value, seen_timestamp)
.map_err(LookupRequestError::SendFailed)
}

fn response_type() -> ResponseType {
Expand Down
Loading