Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions beacon_node/beacon_chain/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use beacon_chain::test_utils::get_kzg;
use criterion::{black_box, criterion_group, criterion_main, Criterion};

use bls::Signature;
use kzg::KzgCommitment;
use kzg::{KzgCommitment, KzgProof};
use types::{
beacon_block_body::KzgCommitments, BeaconBlock, BeaconBlockDeneb, Blob, BlobsList, ChainSpec,
EmptyBlock, EthSpec, MainnetEthSpec, SignedBeaconBlock,
EmptyBlock, EthSpec, KzgProofs, MainnetEthSpec, SignedBeaconBlock,
};

fn create_test_block_and_blobs<E: EthSpec>(
num_of_blobs: usize,
spec: &ChainSpec,
) -> (SignedBeaconBlock<E>, BlobsList<E>) {
) -> (SignedBeaconBlock<E>, BlobsList<E>, KzgProofs<E>) {
let mut block = BeaconBlock::Deneb(BeaconBlockDeneb::empty(spec));
let mut body = block.body_mut();
let blob_kzg_commitments = body.blob_kzg_commitments_mut().unwrap();
Expand All @@ -27,8 +27,9 @@ fn create_test_block_and_blobs<E: EthSpec>(
.map(|_| Blob::<E>::default())
.collect::<Vec<_>>()
.into();
let proofs = vec![KzgProof::empty(); num_of_blobs * spec.number_of_columns as usize].into();

(signed_block, blobs)
(signed_block, blobs, proofs)
}

fn all_benches(c: &mut Criterion) {
Expand All @@ -37,10 +38,11 @@ fn all_benches(c: &mut Criterion) {

let kzg = get_kzg(&spec);
for blob_count in [1, 2, 3, 6] {
let (signed_block, blobs) = create_test_block_and_blobs::<E>(blob_count, &spec);
let (signed_block, blobs, proofs) = create_test_block_and_blobs::<E>(blob_count, &spec);

let column_sidecars = blobs_to_data_column_sidecars(
&blobs.iter().collect::<Vec<_>>(),
proofs.to_vec(),
&signed_block,
&kzg,
&spec,
Expand Down
120 changes: 72 additions & 48 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData};
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
use crate::fetch_blobs::EngineGetBlobsOutput;
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::graffiti_calculator::GraffitiCalculator;
use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker};
Expand Down Expand Up @@ -122,7 +123,6 @@ use store::{
KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::oneshot;
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
Expand Down Expand Up @@ -3151,8 +3151,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
engine_get_blobs_output: EngineGetBlobsOutput<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
Expand All @@ -3166,15 +3165,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
// consumers don't expect the blobs event to fire erratically.
if !self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
{
if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output {
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
}

let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
.check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output)
.await;
self.remove_notified(&block_root, r)
}
Expand Down Expand Up @@ -3634,20 +3630,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await
}

async fn check_engine_blob_availability_and_import(
async fn check_engine_blobs_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
engine_get_blobs_output: EngineGetBlobsOutput<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self.data_availability_checker.put_engine_blobs(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
blobs,
data_column_recv,
)?;
let availability = match engine_get_blobs_output {
EngineGetBlobsOutput::Blobs(blobs) => {
self.check_blobs_for_slashability(block_root, &blobs)?;
self.data_availability_checker
.put_engine_blobs(block_root, blobs)?
}
EngineGetBlobsOutput::DataColumnReceiver(data_column_receiver) => {
// TODO: need to get rid of receiver
// self.check_columns_for_slashability(block_root, data_columns)
self.data_availability_checker.put_data_column_receiver(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
data_column_receiver,
)?
}
};

self.process_availability(slot, availability, || Ok(()))
.await
Expand All @@ -3661,27 +3665,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
let mut slashable_cache = self.observed_slashable.write();
// Assumes all items in custody_columns are for the same block_root
if let Some(column) = custody_columns.first() {
let header = &column.signed_block_header;
if verify_header_signature::<T, BlockError>(self, header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header.clone());
}
}
}
}
self.check_columns_for_slashability(block_root, &custody_columns)?;

// This slot value is purely informative for the consumers of
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
Expand All @@ -3693,6 +3677,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await
}

fn check_columns_for_slashability(
self: &Arc<Self>,
block_root: Hash256,
custody_columns: &DataColumnSidecarList<T::EthSpec>,
) -> Result<(), BlockError> {
let mut slashable_cache = self.observed_slashable.write();
// Assumes all items in custody_columns are for the same block_root
if let Some(column) = custody_columns.first() {
let header = &column.signed_block_header;
if verify_header_signature::<T, BlockError>(self, header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header.clone());
}
}
}
Ok(())
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
///
/// An error is returned if the block was unable to be imported. It may be partially imported
Expand Down Expand Up @@ -5894,15 +5903,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let kzg_proofs = Vec::from(proofs);

let kzg = self.kzg.as_ref();

// TODO(fulu): we no longer need blob proofs from PeerDAS and could avoid computing.
kzg_utils::validate_blobs::<T::EthSpec>(
kzg,
expected_kzg_commitments,
blobs.iter().collect(),
&kzg_proofs,
)
.map_err(BlockProductionError::KzgError)?;
if self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
{
// TODO: extend blobs and verify cells against proofs
// let cells = kzg_utils::compute_cells(kzg, &blobs.iter().collect::<Vec<_>>())
// .map_err(BlockProductionError::KzgError)?;
// kzg_utils::verify_cell_proof_batch::<T::EthSpec>(
// kzg,
// &cells,
// cell_indices,
// kzg_proofs,
// kzg_commitments,
// )
// .map_err(BlockProductionError::KzgError)?;
} else {
kzg_utils::validate_blobs::<T::EthSpec>(
kzg,
expected_kzg_commitments,
blobs.iter().collect(),
&kzg_proofs,
)
.map_err(BlockProductionError::KzgError)?;
}

Some((kzg_proofs.into(), blobs))
}
Expand Down
15 changes: 11 additions & 4 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ use task_executor::JoinHandle;
use types::{
data_column_sidecar::DataColumnSidecarError, BeaconBlockRef, BeaconState, BeaconStateError,
BlobsList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, ExecutionBlockHash, FullPayload,
Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock,
SignedBeaconBlockHeader, Slot,
Hash256, InconsistentFork, KzgProofs, PublicKey, PublicKeyBytes, RelativeEpoch,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};

pub const POS_PANDA_BANNER: &str = r#"
Expand Down Expand Up @@ -748,6 +748,7 @@ pub fn build_blob_data_column_sidecars<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
blobs: BlobsList<T::EthSpec>,
kzg_cell_proofs: KzgProofs<T::EthSpec>,
) -> Result<DataColumnSidecarList<T::EthSpec>, DataColumnSidecarError> {
// Only attempt to build data columns if blobs is non empty to avoid skewing the metrics.
if blobs.is_empty() {
Expand All @@ -759,8 +760,14 @@ pub fn build_blob_data_column_sidecars<T: BeaconChainTypes>(
&[&blobs.len().to_string()],
);
let blob_refs = blobs.iter().collect::<Vec<_>>();
let sidecars = blobs_to_data_column_sidecars(&blob_refs, block, &chain.kzg, &chain.spec)
.discard_timer_on_break(&mut timer)?;
let sidecars = blobs_to_data_column_sidecars(
&blob_refs,
kzg_cell_proofs.to_vec(),
block,
&chain.kzg,
&chain.spec,
)
.discard_timer_on_break(&mut timer)?;
drop(timer);
Ok(sidecars)
}
Expand Down
22 changes: 14 additions & 8 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,15 +569,21 @@ where
{
// After PeerDAS recompute columns from blobs to not force the checkpointz server
// into exposing another route.
let blobs = blobs
// FIXME: the getBlobs endpoint only returns blob proofs - we need to use
// getBlobsSidecarV2 or compute cell proofs.

let (blobs, proofs): (Vec<_>, Vec<_>) = blobs
.iter()
.map(|blob_sidecar| &blob_sidecar.blob)
.collect::<Vec<_>>();
let data_columns =
blobs_to_data_column_sidecars(&blobs, &weak_subj_block, &self.kzg, &self.spec)
.map_err(|e| {
format!("Failed to compute weak subjectivity data_columns: {e:?}")
})?;
.map(|blob_sidecar| (&blob_sidecar.blob, blob_sidecar.kzg_proof))
.unzip();
let data_columns = blobs_to_data_column_sidecars(
&blobs,
proofs,
&weak_subj_block,
&self.kzg,
&self.spec,
)
.map_err(|e| format!("Failed to compute weak subjectivity data_columns: {e:?}"))?;
// TODO(das): only persist the columns under custody
store
.put_data_columns(&weak_subj_block_root, data_columns)
Expand Down
45 changes: 24 additions & 21 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,29 +252,32 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_engine_blobs(
&self,
block_root: Hash256,
block_epoch: Epoch,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_columns_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// `data_columns_recv` is always Some if block_root is post-PeerDAS
if let Some(data_columns_recv) = data_columns_recv {
self.availability_cache.put_computed_data_columns_recv(
block_root,
block_epoch,
data_columns_recv,
&self.log,
)
} else {
let seen_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;
self.availability_cache.put_kzg_verified_blobs(
block_root,
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp),
&self.log,
)
}
let seen_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;
self.availability_cache.put_kzg_verified_blobs(
block_root,
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp),
&self.log,
)
}

/// Puts a data column receiver for the block into the availability cache.
pub fn put_data_column_receiver(
&self,
block_root: Hash256,
block_epoch: Epoch,
data_columns_recv: oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache.put_computed_data_columns_recv(
block_root,
block_epoch,
data_columns_recv,
&self.log,
)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
Expand Down
Loading
Loading