Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions beacon_node/beacon_chain/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use beacon_chain::test_utils::get_kzg;
use criterion::{black_box, criterion_group, criterion_main, Criterion};

use bls::Signature;
use kzg::KzgCommitment;
use kzg::{KzgCommitment, KzgProof};
use types::{
beacon_block_body::KzgCommitments, BeaconBlock, BeaconBlockDeneb, Blob, BlobsList, ChainSpec,
EmptyBlock, EthSpec, MainnetEthSpec, SignedBeaconBlock,
EmptyBlock, EthSpec, KzgProofs, MainnetEthSpec, SignedBeaconBlock,
};

fn create_test_block_and_blobs<E: EthSpec>(
num_of_blobs: usize,
spec: &ChainSpec,
) -> (SignedBeaconBlock<E>, BlobsList<E>) {
) -> (SignedBeaconBlock<E>, BlobsList<E>, KzgProofs<E>) {
let mut block = BeaconBlock::Deneb(BeaconBlockDeneb::empty(spec));
let mut body = block.body_mut();
let blob_kzg_commitments = body.blob_kzg_commitments_mut().unwrap();
Expand All @@ -27,8 +27,9 @@ fn create_test_block_and_blobs<E: EthSpec>(
.map(|_| Blob::<E>::default())
.collect::<Vec<_>>()
.into();
let proofs = vec![KzgProof::empty(); num_of_blobs * spec.number_of_columns as usize].into();

(signed_block, blobs)
(signed_block, blobs, proofs)
}

fn all_benches(c: &mut Criterion) {
Expand All @@ -37,10 +38,11 @@ fn all_benches(c: &mut Criterion) {

let kzg = get_kzg(&spec);
for blob_count in [1, 2, 3, 6] {
let (signed_block, blobs) = create_test_block_and_blobs::<E>(blob_count, &spec);
let (signed_block, blobs, proofs) = create_test_block_and_blobs::<E>(blob_count, &spec);

let column_sidecars = blobs_to_data_column_sidecars(
&blobs.iter().collect::<Vec<_>>(),
proofs.to_vec(),
&signed_block,
&kzg,
&spec,
Expand Down
137 changes: 64 additions & 73 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData};
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
use crate::fetch_blobs::EngineGetBlobsOutput;
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::graffiti_calculator::GraffitiCalculator;
use crate::kzg_utils::reconstruct_blobs;
Expand Down Expand Up @@ -121,7 +122,6 @@ use store::{
KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::oneshot;
use tokio_stream::Stream;
use tracing::{debug, error, info, trace, warn};
use tree_hash::TreeHash;
Expand Down Expand Up @@ -3137,16 +3137,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Process blobs retrieved from the EL and returns the `AvailabilityProcessingStatus`.
///
/// `data_column_recv`: An optional receiver for `DataColumnSidecarList`.
/// If PeerDAS is enabled, this receiver will be provided and used to send
/// the `DataColumnSidecar`s once they have been successfully computed.
pub async fn process_engine_blobs(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
engine_get_blobs_output: EngineGetBlobsOutput<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
Expand All @@ -3160,15 +3155,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
// consumers don't expect the blobs event to fire erratically.
if !self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
{
if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output {
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
}

let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
.check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output)
.await;
self.remove_notified(&block_root, r)
}
Expand Down Expand Up @@ -3618,20 +3610,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await
}

async fn check_engine_blob_availability_and_import(
async fn check_engine_blobs_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
engine_get_blobs_output: EngineGetBlobsOutput<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self.data_availability_checker.put_engine_blobs(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
blobs,
data_column_recv,
)?;
let availability = match engine_get_blobs_output {
EngineGetBlobsOutput::Blobs(blobs) => {
self.check_blobs_for_slashability(block_root, &blobs)?;
self.data_availability_checker
.put_engine_blobs(block_root, blobs)?
}
EngineGetBlobsOutput::DataColumns(data_columns) => {
self.check_columns_for_slashability(block_root, &data_columns)?;
self.data_availability_checker
.put_engine_data_columns(block_root, data_columns)?
}
};

self.process_availability(slot, availability, || Ok(()))
.await
Expand All @@ -3645,27 +3641,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
let mut slashable_cache = self.observed_slashable.write();
// Assumes all items in custody_columns are for the same block_root
if let Some(column) = custody_columns.first() {
let header = &column.signed_block_header;
if verify_header_signature::<T, BlockError>(self, header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header.clone());
}
}
}
}
self.check_columns_for_slashability(block_root, &custody_columns)?;

// This slot value is purely informative for the consumers of
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
Expand All @@ -3677,6 +3653,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await
}

fn check_columns_for_slashability(
self: &Arc<Self>,
block_root: Hash256,
custody_columns: &DataColumnSidecarList<T::EthSpec>,
) -> Result<(), BlockError> {
let mut slashable_cache = self.observed_slashable.write();
// Assumes all items in custody_columns are for the same block_root
if let Some(column) = custody_columns.first() {
let header = &column.signed_block_header;
if verify_header_signature::<T, BlockError>(self, header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header.clone());
}
}
}
Ok(())
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
///
/// An error is returned if the block was unable to be imported. It may be partially imported
Expand Down Expand Up @@ -5798,15 +5799,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let kzg_proofs = Vec::from(proofs);

let kzg = self.kzg.as_ref();

// TODO(fulu): we no longer need blob proofs from PeerDAS and could avoid computing.
kzg_utils::validate_blobs::<T::EthSpec>(
kzg,
expected_kzg_commitments,
blobs.iter().collect(),
&kzg_proofs,
)
.map_err(BlockProductionError::KzgError)?;
if self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
{
kzg_utils::validate_blobs_and_cell_proofs::<T::EthSpec>(
kzg,
blobs.iter().collect(),
&kzg_proofs,
expected_kzg_commitments,
)
.map_err(BlockProductionError::KzgError)?;
} else {
kzg_utils::validate_blobs::<T::EthSpec>(
kzg,
expected_kzg_commitments,
blobs.iter().collect(),
&kzg_proofs,
)
.map_err(BlockProductionError::KzgError)?;
}

Some((kzg_proofs.into(), blobs))
}
Expand Down Expand Up @@ -7118,27 +7130,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)))
}
AvailableBlockData::DataColumnsRecv(data_column_recv) => {
// Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
// Unable to receive data columns from sender, sender is either dropped or
// failed to compute data columns from blobs. We restore fork choice here and
// return to avoid inconsistency in database.
let computed_data_columns = data_column_recv
.blocking_recv()
.map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?;
debug!(
%block_root,
count = computed_data_columns.len(),
"Writing data columns to store"
);
// TODO(das): Store only this node's custody columns
Ok(Some(StoreOp::PutDataColumns(
block_root,
computed_data_columns,
)))
}
}
}
}
Expand Down
15 changes: 11 additions & 4 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ use tracing::{debug, error};
use types::{
data_column_sidecar::DataColumnSidecarError, BeaconBlockRef, BeaconState, BeaconStateError,
BlobsList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, ExecutionBlockHash, FullPayload,
Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock,
SignedBeaconBlockHeader, Slot,
Hash256, InconsistentFork, KzgProofs, PublicKey, PublicKeyBytes, RelativeEpoch,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};

pub const POS_PANDA_BANNER: &str = r#"
Expand Down Expand Up @@ -755,6 +755,7 @@ pub fn build_blob_data_column_sidecars<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
blobs: BlobsList<T::EthSpec>,
kzg_cell_proofs: KzgProofs<T::EthSpec>,
) -> Result<DataColumnSidecarList<T::EthSpec>, DataColumnSidecarError> {
// Only attempt to build data columns if blobs is non empty to avoid skewing the metrics.
if blobs.is_empty() {
Expand All @@ -766,8 +767,14 @@ pub fn build_blob_data_column_sidecars<T: BeaconChainTypes>(
&[&blobs.len().to_string()],
);
let blob_refs = blobs.iter().collect::<Vec<_>>();
let sidecars = blobs_to_data_column_sidecars(&blob_refs, block, &chain.kzg, &chain.spec)
.discard_timer_on_break(&mut timer)?;
let sidecars = blobs_to_data_column_sidecars(
&blob_refs,
kzg_cell_proofs.to_vec(),
block,
&chain.kzg,
&chain.spec,
)
.discard_timer_on_break(&mut timer)?;
drop(timer);
Ok(sidecars)
}
Expand Down
59 changes: 48 additions & 11 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::eth1_finalization_cache::Eth1FinalizationCache;
use crate::fork_choice_signal::ForkChoiceSignalTx;
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin};
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::kzg_utils::build_data_column_sidecars;
use crate::light_client_server_cache::LightClientServerCache;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
use crate::observed_data_sidecars::ObservedDataSidecars;
Expand All @@ -30,6 +30,7 @@ use logging::crit;
use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::{Mutex, RwLock};
use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold};
use rayon::prelude::*;
use slasher::Slasher;
use slot_clock::{SlotClock, TestingSlotClock};
use state_processing::{per_slot_processing, AllCaches};
Expand All @@ -40,8 +41,8 @@ use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor};
use tracing::{debug, error, info};
use types::{
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, Epoch, EthSpec,
FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot,
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, DataColumnSidecarList, Epoch,
EthSpec, FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot,
};

/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
Expand Down Expand Up @@ -546,15 +547,8 @@ where
{
// After PeerDAS recompute columns from blobs to not force the checkpointz server
// into exposing another route.
let blobs = blobs
.iter()
.map(|blob_sidecar| &blob_sidecar.blob)
.collect::<Vec<_>>();
let data_columns =
blobs_to_data_column_sidecars(&blobs, &weak_subj_block, &self.kzg, &self.spec)
.map_err(|e| {
format!("Failed to compute weak subjectivity data_columns: {e:?}")
})?;
build_data_columns_from_blobs(&weak_subj_block, &blobs, &self.kzg, &self.spec)?;
// TODO(das): only persist the columns under custody
store
.put_data_columns(&weak_subj_block_root, data_columns)
Expand Down Expand Up @@ -1138,6 +1132,49 @@ fn descriptive_db_error(item: &str, error: &StoreError) -> String {
)
}

/// Build data columns and proofs from blobs.
fn build_data_columns_from_blobs<E: EthSpec>(
block: &SignedBeaconBlock<E>,
blobs: &BlobSidecarList<E>,
kzg: &Kzg,
spec: &ChainSpec,
) -> Result<DataColumnSidecarList<E>, String> {
let blob_cells_and_proofs_vec = blobs
.into_par_iter()
.map(|blob_sidecar| {
let kzg_blob_ref = blob_sidecar
.blob
.as_ref()
.try_into()
.map_err(|e| format!("Failed to convert blob to kzg blob: {e:?}"))?;
let cells_and_proofs = kzg
.compute_cells_and_proofs(kzg_blob_ref)
.map_err(|e| format!("Failed to compute cell kzg proofs: {e:?}"))?;
Ok(cells_and_proofs)
})
.collect::<Result<Vec<_>, String>>()?;

let data_columns = {
let beacon_block_body = block.message().body();
let kzg_commitments = beacon_block_body
.blob_kzg_commitments()
.cloned()
.map_err(|e| format!("Unexpected pre Deneb block: {e:?}"))?;
let kzg_commitments_inclusion_proof = beacon_block_body
.kzg_commitments_merkle_proof()
.map_err(|e| format!("Failed to compute kzg commitments merkle proof: {e:?}"))?;
build_data_column_sidecars(
kzg_commitments,
kzg_commitments_inclusion_proof,
block.signed_block_header(),
blob_cells_and_proofs_vec,
spec,
)
.map_err(|e| format!("Failed to compute weak subjectivity data_columns: {e:?}"))?
};
Ok(data_columns)
}

#[cfg(not(debug_assertions))]
#[cfg(test)]
mod test {
Expand Down
Loading
Loading