Skip to content
Closed
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2566,7 +2566,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
epoch: Epoch,
validator_indices: &[u64],
) -> Result<Vec<Option<SyncDuty>>, Error> {
) -> Result<Vec<Result<Option<SyncDuty>, BeaconStateError>>, Error> {
self.with_head(move |head| {
head.beacon_state
.get_sync_committee_duties(epoch, validator_indices, &self.spec)
Expand Down Expand Up @@ -2891,7 +2891,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

self.data_availability_checker
.notify_gossip_blob(blob.slot(), block_root, &blob);
.notify_gossip_blob(block_root, &blob);
let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
}
Expand Down Expand Up @@ -2925,7 +2925,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

self.data_availability_checker
.notify_rpc_blobs(slot, block_root, &blobs);
.notify_rpc_blobs(block_root, &blobs);
let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;
Expand Down
31 changes: 6 additions & 25 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use task_executor::TaskExecutor;
use types::beacon_block_body::KzgCommitmentOpts;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};

mod availability_view;
mod child_components;
Expand Down Expand Up @@ -356,41 +356,30 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
/// them here is useful to avoid duplicate downloads of blocks, as well as understanding
/// our blob download requirements. We will also serve this over RPC.
pub fn notify_block(&self, block_root: Hash256, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
let slot = block.slot();
self.processing_cache
.write()
.entry(block_root)
.or_insert_with(|| ProcessingComponents::new(slot))
.or_default()
.merge_block(block);
}

/// Add a single blob commitment to the processing cache. This commitment is unverified but caching
/// them here is useful to avoid duplicate downloads of blobs, as well as understanding
/// our block and blob download requirements.
pub fn notify_gossip_blob(
&self,
slot: Slot,
block_root: Hash256,
blob: &GossipVerifiedBlob<T>,
) {
pub fn notify_gossip_blob(&self, block_root: Hash256, blob: &GossipVerifiedBlob<T>) {
let index = blob.index();
let commitment = blob.kzg_commitment();
self.processing_cache
.write()
.entry(block_root)
.or_insert_with(|| ProcessingComponents::new(slot))
.or_default()
.merge_single_blob(index as usize, commitment);
}

/// Adds blob commitments to the processing cache. These commitments are unverified but caching
/// them here is useful to avoid duplicate downloads of blobs, as well as understanding
/// our block and blob download requirements.
pub fn notify_rpc_blobs(
&self,
slot: Slot,
block_root: Hash256,
blobs: &FixedBlobSidecarList<T::EthSpec>,
) {
pub fn notify_rpc_blobs(&self, block_root: Hash256, blobs: &FixedBlobSidecarList<T::EthSpec>) {
let mut commitments = KzgCommitmentOpts::<T::EthSpec>::default();
for blob in blobs.iter().flatten() {
if let Some(commitment) = commitments.get_mut(blob.index as usize) {
Expand All @@ -400,7 +389,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.processing_cache
.write()
.entry(block_root)
.or_insert_with(|| ProcessingComponents::new(slot))
.or_default()
.merge_blobs(commitments);
}

Expand All @@ -409,14 +398,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.processing_cache.write().remove(block_root)
}

/// Gather all block roots for which we are not currently processing all components for the
/// given slot.
pub fn incomplete_processing_components(&self, slot: Slot) -> Vec<Hash256> {
self.processing_cache
.read()
.incomplete_processing_components(slot)
}

/// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use types::beacon_block_body::KzgCommitmentOpts;
use types::{EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{EthSpec, Hash256, SignedBeaconBlock};

/// This cache is used only for gossip blocks/blobs and single block/blob lookups, to give req/resp
/// a view of what we have and what we require. This cache serves a slightly different purpose than
Expand All @@ -29,23 +29,13 @@ impl<E: EthSpec> ProcessingCache<E> {
.get(block_root)
.map_or(false, |b| b.block_exists())
}
pub fn incomplete_processing_components(&self, slot: Slot) -> Vec<Hash256> {
let mut roots_missing_components = vec![];
for (&block_root, info) in self.processing_cache.iter() {
if info.slot == slot && !info.is_available() {
roots_missing_components.push(block_root);
}
}
roots_missing_components
}
pub fn len(&self) -> usize {
self.processing_cache.len()
}
}

#[derive(Debug, Clone)]
#[derive(Default, Debug, Clone)]
pub struct ProcessingComponents<E: EthSpec> {
slot: Slot,
/// Blobs required for a block can only be known if we have seen the block. So `Some` here
/// means we've seen it, a `None` means we haven't. The `kzg_commitments` value helps us figure
/// out whether incoming blobs actually match the block.
Expand All @@ -56,12 +46,8 @@ pub struct ProcessingComponents<E: EthSpec> {
}

impl<E: EthSpec> ProcessingComponents<E> {
pub fn new(slot: Slot) -> Self {
Self {
slot,
block: None,
blob_commitments: KzgCommitmentOpts::<E>::default(),
}
pub fn new() -> Self {
Self::default()
}
}

Expand All @@ -70,7 +56,6 @@ impl<E: EthSpec> ProcessingComponents<E> {
impl<E: EthSpec> ProcessingComponents<E> {
pub fn empty(_block_root: Hash256) -> Self {
Self {
slot: Slot::new(0),
block: None,
blob_commitments: KzgCommitmentOpts::<E>::default(),
}
Expand Down
53 changes: 50 additions & 3 deletions beacon_node/http_api/src/sync_committees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ pub fn sync_committee_duties<T: BeaconChainTypes>(
// the vast majority of requests. Rather than checking if we think the request will succeed in a
// way prone to data races, we attempt the request immediately and check the error code.
match chain.sync_committee_duties_from_head(request_epoch, request_indices) {
Ok(duties) => return Ok(convert_to_response(duties, execution_optimistic)),
Ok(duties) => {
return Ok(convert_to_response(
verify_unknown_validators(duties, request_epoch, chain)?,
execution_optimistic,
))
}
Err(BeaconChainError::SyncDutiesError(BeaconStateError::SyncCommitteeNotKnown {
..
}))
Expand All @@ -64,7 +69,10 @@ pub fn sync_committee_duties<T: BeaconChainTypes>(
)),
e => warp_utils::reject::beacon_chain_error(e),
})?;
Ok(convert_to_response(duties, execution_optimistic))
Ok(convert_to_response(
verify_unknown_validators(duties, request_epoch, chain)?,
execution_optimistic,
))
}

/// Slow path for duties: load a state and use it to compute the duties.
Expand All @@ -73,7 +81,7 @@ fn duties_from_state_load<T: BeaconChainTypes>(
request_indices: &[u64],
altair_fork_epoch: Epoch,
chain: &BeaconChain<T>,
) -> Result<Vec<Option<SyncDuty>>, BeaconChainError> {
) -> Result<Vec<Result<Option<SyncDuty>, BeaconStateError>>, BeaconChainError> {
// Determine what the current epoch would be if we fast-forward our system clock by
// `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
//
Expand Down Expand Up @@ -121,6 +129,45 @@ fn duties_from_state_load<T: BeaconChainTypes>(
}
}

fn verify_unknown_validators<T: BeaconChainTypes>(
duties: Vec<Result<Option<SyncDuty>, BeaconStateError>>,
request_epoch: Epoch,
chain: &BeaconChain<T>,
) -> Result<Vec<Option<SyncDuty>>, warp::reject::Rejection> {
// Lazily load the request_epoch_state, as it is only needed if there are any UnknownValidator
let mut request_epoch_state = None;

duties
.into_iter()
.map(|res| {
res.or_else(|err| {
// Make sure the validator is really unknown w.r.t. the request_epoch
if let BeaconStateError::UnknownValidator(idx) = err {
let request_epoch_state = match &mut request_epoch_state {
Some(state) => state,
None => request_epoch_state.insert(chain.state_at_slot(
request_epoch.start_slot(T::EthSpec::slots_per_epoch()),
StateSkipConfig::WithoutStateRoots,
)?),
};
request_epoch_state
.get_validator(idx)
.map_err(BeaconChainError::SyncDutiesError)
.map(|_| None)
} else {
Err(BeaconChainError::SyncDutiesError(err))
}
})
})
.collect::<Result<Vec<_>, _>>()
.map_err(|err| match err {
BeaconChainError::SyncDutiesError(BeaconStateError::UnknownValidator(idx)) => {
warp_utils::reject::custom_bad_request(format!("invalid validator index: {idx}"))
}
e => warp_utils::reject::beacon_chain_error(e),
})
}

fn convert_to_response(duties: Vec<Option<SyncDuty>>, execution_optimistic: bool) -> SyncDuties {
api_types::GenericResponse::from(duties.into_iter().flatten().collect::<Vec<_>>())
.add_execution_optimistic(execution_optimistic)
Expand Down
6 changes: 3 additions & 3 deletions consensus/types/src/beacon_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,10 +960,10 @@ impl<T: EthSpec> BeaconState<T> {
epoch: Epoch,
validator_indices: &[u64],
spec: &ChainSpec,
) -> Result<Vec<Option<SyncDuty>>, Error> {
) -> Result<Vec<Result<Option<SyncDuty>, Error>>, Error> {
let sync_committee = self.get_built_sync_committee(epoch, spec)?;

validator_indices
Ok(validator_indices
.iter()
.map(|&validator_index| {
let pubkey = self.get_validator(validator_index as usize)?.pubkey;
Expand All @@ -974,7 +974,7 @@ impl<T: EthSpec> BeaconState<T> {
sync_committee,
))
})
.collect()
.collect())
}

/// Get the canonical root of the `latest_block_header`, filling in its state root if necessary.
Expand Down
3 changes: 1 addition & 2 deletions scripts/local_testnet/geth.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,4 @@ exec $GETH_BINARY \
--bootnodes $EL_BOOTNODE_ENODE \
--port $network_port \
--http.port $http_port \
--authrpc.port $auth_port \
2>&1 | tee $data_dir/geth.log
--authrpc.port $auth_port