Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2955,7 +2955,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or erred.
/// and evict if the block was imported or errored.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
self: &Arc<Self>,
block_root: Hash256,
Expand Down Expand Up @@ -2998,22 +2998,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Increment the Prometheus counter for block processing requests.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);

let block_slot = unverified_block.block().slot();

// Set observed time if not already set. Usually this should be set by gossip or RPC,
// but just in case we set it again here (useful for tests).
if let (Some(seen_timestamp), Some(current_slot)) =
(self.slot_clock.now_duration(), self.slot_clock.now())
{
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
self.block_times_cache.write().set_time_observed(
block_root,
current_slot,
block_slot,
seen_timestamp,
None,
None,
);
}

let block_slot = unverified_block.block().slot();

// A small closure to group the verification and import errors.
let chain = self.clone();
let import_block = async move {
Expand Down Expand Up @@ -3090,8 +3088,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Accepts a fully-verified block and awaits on it's payload verification handle to
/// get a fully `ExecutedBlock`
/// Accepts a fully-verified block and awaits on its payload verification handle to
/// get a fully `ExecutedBlock`.
///
/// An error is returned if the verification handle couldn't be awaited.
pub async fn into_executed_block(
Expand Down Expand Up @@ -3256,6 +3254,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
consensus_context,
} = import_data;

// Record the time at which this block became available.
self.block_times_cache.write().set_time_available(
block_root,
block.slot(),
block.available_timestamp(),
);

// import
let chain = self.clone();
let block_root = self
Expand Down Expand Up @@ -3398,6 +3403,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"Early attester cache insert failed";
"error" => ?e
);
} else {
let attestable_timestamp =
self.slot_clock.now_duration().unwrap_or_default();
self.block_times_cache.write().set_time_attestable(
block_root,
signed_block.slot(),
attestable_timestamp,
)
}
} else {
warn!(
Expand Down
50 changes: 49 additions & 1 deletion beacon_node/beacon_chain/src/block_times_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,27 @@ type BlockRoot = Hash256;
#[derive(Clone, Default)]
pub struct Timestamps {
pub observed: Option<Duration>,
pub available: Option<Duration>,
pub attestable: Option<Duration>,
pub imported: Option<Duration>,
pub set_as_head: Option<Duration>,
}

// Helps arrange delay data so it is more relevant to metrics.
#[derive(Debug, Default)]
pub struct BlockDelays {
/// Time after start of slot.
pub observed: Option<Duration>,
/// Time after `observable`.
pub available: Option<Duration>,
/// Time after `available`.
pub attestable: Option<Duration>,
/// ALSO time after `available`.
///
/// We need to use `available` again rather than `attestable` to handle the case where the block
/// does not get added to the early-attester cache.
pub imported: Option<Duration>,
/// Time after `imported`.
pub set_as_head: Option<Duration>,
}

Expand All @@ -35,14 +47,22 @@ impl BlockDelays {
let observed = times
.observed
.and_then(|observed_time| observed_time.checked_sub(slot_start_time));
let available = times
.available
.and_then(|available_time| available_time.checked_sub(times.observed?));
let attestable = times
.attestable
.and_then(|attestable_time| attestable_time.checked_sub(times.available?));
let imported = times
.imported
.and_then(|imported_time| imported_time.checked_sub(times.observed?));
.and_then(|imported_time| imported_time.checked_sub(times.available?));
let set_as_head = times
.set_as_head
.and_then(|set_as_head_time| set_as_head_time.checked_sub(times.imported?));
BlockDelays {
observed,
available,
attestable,
imported,
set_as_head,
}
Expand Down Expand Up @@ -109,6 +129,34 @@ impl BlockTimesCache {
}
}

pub fn set_time_available(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.available
.map_or(true, |prev| timestamp < prev)
{
block_times.timestamps.available = Some(timestamp);
}
}

pub fn set_time_attestable(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.attestable
.map_or(true, |prev| timestamp < prev)
{
block_times.timestamps.attestable = Some(timestamp);
}
}

pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,7 @@ type PayloadVerificationHandle<E> =
/// - Parent is known
/// - Signatures
/// - State root check
/// - Per block processing
/// - Blobs sidecar has been validated if present
/// - Block processing
///
/// Note: a `ExecutionPendingBlock` is not _forever_ valid to be imported, it may later become invalid
/// due to finality or some other event. A `ExecutionPendingBlock` should be imported into the
Expand Down
27 changes: 23 additions & 4 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,9 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(

// If the block was enshrined as head too late for attestations to be created for it,
// log a debug warning and increment a metric.
let format_delay = |delay: &Option<Duration>| {
delay.map_or("unknown".to_string(), |d| format!("{}", d.as_millis()))
};
if late_head {
metrics::inc_counter(&metrics::BEACON_BLOCK_HEAD_SLOT_START_DELAY_EXCEEDED_TOTAL);
debug!(
Expand All @@ -1445,10 +1448,26 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
"block_root" => ?head_block_root,
"proposer_index" => head_block_proposer_index,
"slot" => head_block_slot,
"block_delay" => ?block_delay_total,
"observed_delay" => ?block_delays.observed,
"imported_delay" => ?block_delays.imported,
"set_as_head_delay" => ?block_delays.set_as_head,
"block_delay_ms" => block_delay_total.as_millis(),
"observed_delay_ms" => format_delay(&block_delays.observed),
"available_delay_ms" => format_delay(&block_delays.available),
"attestable_delay_ms" => format_delay(&block_delays.attestable),
"imported_delay_ms" => format_delay(&block_delays.imported),
"set_as_head_delay_ms" => format_delay(&block_delays.set_as_head),
);
} else {
debug!(
log,
"On-time head block";
"block_root" => ?head_block_root,
"proposer_index" => head_block_proposer_index,
"slot" => head_block_slot,
"block_delay_ms" => block_delay_total.as_millis(),
"observed_delay_ms" => format_delay(&block_delays.observed),
"available_delay_ms" => format_delay(&block_delays.available),
"attestable_delay_ms" => format_delay(&block_delays.attestable),
"imported_delay_ms" => format_delay(&block_delays.imported),
"set_as_head_delay_ms" => format_delay(&block_delays.set_as_head),
);
}
}
Expand Down
40 changes: 36 additions & 4 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::fmt;
use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use types::beacon_block_body::KzgCommitmentOpts;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
Expand Down Expand Up @@ -213,7 +214,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map_err(AvailabilityCheckError::Kzg)?;

self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs)
.put_kzg_verified_blobs(block_root, verified_blobs, &self.slot_clock)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
Expand All @@ -225,8 +226,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
gossip_blob: GossipVerifiedBlob<T>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache
.put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()])
self.availability_cache.put_kzg_verified_blobs(
gossip_blob.block_root(),
vec![gossip_blob.into_inner()],
&self.slot_clock,
)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
Expand All @@ -236,7 +240,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
executed_block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache
.put_pending_executed_block(executed_block)
.put_pending_executed_block(executed_block, &self.slot_clock)
}

/// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may
Expand All @@ -254,10 +258,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
if self.blobs_required_for_block(&block) {
Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block })
} else {
let available_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;
Ok(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: None,
available_timestamp,
}))
}
}
Expand All @@ -273,10 +282,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
} else {
None
};
let available_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;
Ok(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: verified_blobs,
available_timestamp,
}))
}
}
Expand Down Expand Up @@ -318,10 +332,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
if self.blobs_required_for_block(&block) {
results.push(MaybeAvailableBlock::AvailabilityPending { block_root, block })
} else {
let available_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;
results.push(MaybeAvailableBlock::Available(AvailableBlock {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fairly certain this function is only called during range-sync or backfill. So these blocks are not related to the current slot. Also due to the way sync works, none of the blocks hit this cache until all the data for all the blocks in the request are downloaded. Thus it's a bit meaningless to measure how far into the current slot they became available. Might be a good idea to exclude them from the data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it!

block_root,
block,
blobs: None,
available_timestamp,
}))
}
}
Expand All @@ -332,10 +351,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
None
};
// already verified kzg for all blobs
let available_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;
results.push(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: verified_blobs,
available_timestamp,
}))
}
}
Expand Down Expand Up @@ -543,6 +567,8 @@ pub struct AvailableBlock<E: EthSpec> {
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
/// Timestamp at which this block first became available (UNIX timestamp, time since 1970).
available_timestamp: Duration,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing the timestamp here will force you to include the timestamp for backfill & range-sync blocks which will only pollute the data. I think a better way would be to include this timestamp in the AvailableExecutedBlock & then populate it inside make_available(). This will probe what I believe you actually want as make_available() is the final destination of:

which are the 3 entry points for which new data can come in & complete a block.

The other entry points:

Don't actually hit the DA cache as they assume all blobs are already present.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried doing this just now but got stuck because to fill in the available_timestamp on AvailableExecutedBlock here would require it to be part of the AvailableBlock anyway:

MaybeAvailableBlock::Available(available_block) => {
Self::Available(AvailableExecutedBlock::new(
available_block,
import_data,
payload_verification_outcome,
))
}


impl<E: EthSpec> AvailableBlock<E> {
Expand All @@ -555,6 +581,7 @@ impl<E: EthSpec> AvailableBlock<E> {
block_root,
block,
blobs,
available_timestamp: Duration::from_millis(0),
}
}

Expand All @@ -569,6 +596,10 @@ impl<E: EthSpec> AvailableBlock<E> {
self.blobs.as_ref()
}

pub fn available_timestamp(&self) -> Duration {
self.available_timestamp
}

pub fn deconstruct(
self,
) -> (
Expand All @@ -580,6 +611,7 @@ impl<E: EthSpec> AvailableBlock<E> {
block_root,
block,
blobs,
available_timestamp: _,
} = self;
(block_root, block, blobs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub enum Error {
ParentStateMissing(Hash256),
BlockReplayError(state_processing::BlockReplayError),
RebuildingStateCaches(BeaconStateError),
SlotClockError,
}

pub enum ErrorCategory {
Expand All @@ -39,7 +40,8 @@ impl Error {
| Error::Unexpected
| Error::ParentStateMissing(_)
| Error::BlockReplayError(_)
| Error::RebuildingStateCaches(_) => ErrorCategory::Internal,
| Error::RebuildingStateCaches(_)
| Error::SlotClockError => ErrorCategory::Internal,
Error::Kzg(_)
| Error::BlobIndexInvalid(_)
| Error::KzgCommitmentMismatch { .. }
Expand Down
Loading