Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,13 @@ impl TryInto<Hash256> for AvailabilityProcessingStatus {
/// The result of a chain segment processing.
pub enum ChainSegmentResult<E: EthSpec> {
/// Processing this chain segment finished successfully.
Successful { imported_blocks: usize },
Successful {
imported_blocks: Vec<(Hash256, Slot)>,
},
/// There was an error processing this chain segment. Before the error, some blocks could
/// have been imported.
Failed {
imported_blocks: usize,
imported_blocks: Vec<(Hash256, Slot)>,
error: BlockError<E>,
},
}
Expand Down Expand Up @@ -2709,7 +2711,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
chain_segment: Vec<RpcBlock<T::EthSpec>>,
) -> Result<Vec<HashBlockTuple<T::EthSpec>>, ChainSegmentResult<T::EthSpec>> {
// This function will never import any blocks.
let imported_blocks = 0;
let imported_blocks = vec![];
let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len());

// Produce a list of the parent root and slot of the child of each block.
Expand Down Expand Up @@ -2815,7 +2817,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
chain_segment: Vec<RpcBlock<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer,
) -> ChainSegmentResult<T::EthSpec> {
let mut imported_blocks = 0;
let mut imported_blocks = vec![];

// Filter uninteresting blocks from the chain segment in a blocking task.
let chain = self.clone();
Expand Down Expand Up @@ -2875,6 +2877,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Import the blocks into the chain.
for signature_verified_block in signature_verified_blocks {
let block_slot = signature_verified_block.slot();
match self
.process_block(
signature_verified_block.block_root(),
Expand All @@ -2886,9 +2889,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
{
Ok(status) => {
match status {
AvailabilityProcessingStatus::Imported(_) => {
AvailabilityProcessingStatus::Imported(block_root) => {
// The block was imported successfully.
imported_blocks += 1;
imported_blocks.push((block_root, block_slot));
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
warn!(self.log, "Blobs missing in response to range request";
Expand Down Expand Up @@ -6823,6 +6826,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.data_availability_checker.data_availability_boundary()
}

/// Returns true if we should issue a sampling request for this block
/// TODO(das): check if the block is still within the da_window
pub fn should_sample_slot(&self, slot: Slot) -> bool {
self.spec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

self.spec.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))

.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
}

pub fn logger(&self) -> &Logger {
&self.log
}
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,10 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
pub fn block_root(&self) -> Hash256 {
self.block_root
}

pub fn slot(&self) -> Slot {
self.block.slot()
}
}

impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for SignatureVerifiedBlock<T> {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/tests/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ async fn add_base_block_to_altair_chain() {
)
.await,
ChainSegmentResult::Failed {
imported_blocks: 0,
..
error: BlockError::InconsistentFork(InconsistentFork {
fork_at_slot: ForkName::Altair,
object_fork: ForkName::Base,
Expand Down Expand Up @@ -1497,7 +1497,7 @@ async fn add_altair_block_to_base_chain() {
)
.await,
ChainSegmentResult::Failed {
imported_blocks: 0,
..
error: BlockError::InconsistentFork(InconsistentFork {
fork_at_slot: ForkName::Base,
object_fork: ForkName::Altair,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1278,20 +1278,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;

// TODO(das) Might be too early to issue a request here. We haven't checked that the block
// actually includes blob transactions and thus has data. A peer could send a block is
// garbage commitments, and make us trigger sampling for a block that does not have data.
if block.num_expected_blobs() > 0 {
// Trigger sampling for block not yet execution valid. At this point column custodials are
// unlikely to have received their columns. Triggering sampling so early is only viable with
// either:
// - Sync delaying sampling until some latter window
// - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569
if self
.chain
.spec
.eip7594_fork_epoch
.map_or(false, |eip7594_fork_epoch| {
block.epoch() >= eip7594_fork_epoch
})
{
if self.chain.should_sample_slot(block.slot()) {
self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot()));
}
}
Expand Down
30 changes: 26 additions & 4 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};

let slot = block.slot();
let block_has_data = block.as_block().num_expected_blobs() > 0;
let parent_root = block.message().parent_root();
let commitments_formatted = block.as_block().commitments_formatted();

Expand Down Expand Up @@ -184,6 +185,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.chain.recompute_head_at_current_slot().await;
}
}

// RPC block imported or execution validated. If the block was already imported by gossip we
// receive Err(BlockError::AlreadyKnown).
if result.is_ok() &&
// Block has at least one blob, so it produced columns
block_has_data &&
// Block slot is within the DA boundary (should always be the case) and PeerDAS is activated
self.chain.should_sample_slot(slot)
{
self.send_sync_message(SyncMessage::SampleBlock(block_root, slot));
}

// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
Expand Down Expand Up @@ -491,21 +504,30 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
{
ChainSegmentResult::Successful { imported_blocks } => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL);
if imported_blocks > 0 {
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;

for (block_root, block_slot) in &imported_blocks {
if self.chain.should_sample_slot(*block_slot) {
self.send_sync_message(SyncMessage::SampleBlock(
*block_root,
*block_slot,
));
}
}
}
(imported_blocks, Ok(()))
(imported_blocks.len(), Ok(()))
}
ChainSegmentResult::Failed {
imported_blocks,
error,
} => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL);
let r = self.handle_failed_chain_segment(error);
if imported_blocks > 0 {
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;
}
(imported_blocks, r)
(imported_blocks.len(), r)
}
}
}
Expand Down