Skip to content

Commit b06c23e

Browse files
committed
Reconstruct data columns without blocking processing and import (#5990)
Squashed commit of the following: commit 7d2d826 Author: Jimmy Chen <[email protected]> Date: Mon Jul 1 13:44:45 2024 +1000 Send import results to sync after reconstruction. Add more logging and metrics. commit 4b30ebe Merge: f93e2b5 7206909 Author: Jimmy Chen <[email protected]> Date: Fri Jun 28 17:23:22 2024 +1000 Merge branch 'das' into fork/reconstruct-without-blocking-import commit f93e2b5 Author: Jimmy Chen <[email protected]> Date: Fri Jun 28 14:42:04 2024 +1000 Code cleanup: add type aliases and update comments. commit 6ac055d Author: Jimmy Chen <[email protected]> Date: Fri Jun 28 14:26:40 2024 +1000 Revert reconstruction behaviour to always go ahead rather than allowing one at a time. Address other review comments. commit 1e3964e Author: Jimmy Chen <[email protected]> Date: Tue Jun 25 00:02:19 2024 +1000 Reconstruct columns without blocking processing and import.
1 parent 7206909 commit b06c23e

File tree

11 files changed

+321
-227
lines changed

11 files changed

+321
-227
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 59 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ pub use crate::canonical_head::CanonicalHead;
2222
use crate::chain_config::ChainConfig;
2323
use crate::data_availability_checker::{
2424
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
25-
DataColumnsToPublish,
2625
};
2726
use crate::data_column_verification::{
2827
CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumn,
@@ -3057,13 +3056,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30573056
pub async fn process_gossip_data_columns(
30583057
self: &Arc<Self>,
30593058
data_columns: Vec<GossipVerifiedDataColumn<T>>,
3060-
) -> Result<
3061-
(
3062-
AvailabilityProcessingStatus,
3063-
DataColumnsToPublish<T::EthSpec>,
3064-
),
3065-
BlockError<T::EthSpec>,
3066-
> {
3059+
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
30673060
let Ok(block_root) = data_columns
30683061
.iter()
30693062
.map(|c| c.block_root())
@@ -3131,13 +3124,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31313124
self: &Arc<Self>,
31323125
block_root: Hash256,
31333126
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
3134-
) -> Result<
3135-
(
3136-
AvailabilityProcessingStatus,
3137-
DataColumnsToPublish<T::EthSpec>,
3138-
),
3139-
BlockError<T::EthSpec>,
3140-
> {
3127+
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
31413128
// If this block has already been imported to forkchoice it must have been available, so
31423129
// we don't need to process its columns again.
31433130
if self
@@ -3162,6 +3149,52 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31623149
self.remove_notified_custody_columns(&block_root, r)
31633150
}
31643151

3152+
pub async fn reconstruct_data_columns(
3153+
self: &Arc<Self>,
3154+
block_root: Hash256,
3155+
) -> Result<
3156+
Option<(
3157+
AvailabilityProcessingStatus,
3158+
DataColumnSidecarVec<T::EthSpec>,
3159+
)>,
3160+
BlockError<T::EthSpec>,
3161+
> {
3162+
// As of now we only reconstruct data columns on supernodes, so if the block is already
3163+
// available on a supernode, there's no need to reconstruct as the node must already have
3164+
// all columns.
3165+
if self
3166+
.canonical_head
3167+
.fork_choice_read_lock()
3168+
.contains_block(&block_root)
3169+
{
3170+
return Ok(None);
3171+
}
3172+
3173+
let Some((availability, data_column_to_publish)) = self
3174+
.data_availability_checker
3175+
.reconstruct_data_columns(block_root)?
3176+
else {
3177+
return Ok(None);
3178+
};
3179+
3180+
let Ok(slot) = data_column_to_publish
3181+
.iter()
3182+
.map(|c| c.slot())
3183+
.unique()
3184+
.exactly_one()
3185+
else {
3186+
return Err(BlockError::InternalError(
3187+
"Columns for the same block should have matching slot".to_string(),
3188+
));
3189+
};
3190+
3191+
let r = self.process_availability(slot, availability).await;
3192+
self.remove_notified_custody_columns(&block_root, r)
3193+
.map(|availability_processing_status| {
3194+
Some((availability_processing_status, data_column_to_publish))
3195+
})
3196+
}
3197+
31653198
/// Remove any block components from the *processing cache* if we no longer require them. If the
31663199
/// block was imported full or erred, we no longer require them.
31673200
fn remove_notified(
@@ -3179,15 +3212,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31793212

31803213
/// Remove any block components from the *processing cache* if we no longer require them. If the
31813214
/// block was imported full or erred, we no longer require them.
3182-
fn remove_notified_custody_columns<P>(
3215+
fn remove_notified_custody_columns(
31833216
&self,
31843217
block_root: &Hash256,
3185-
r: Result<(AvailabilityProcessingStatus, P), BlockError<T::EthSpec>>,
3186-
) -> Result<(AvailabilityProcessingStatus, P), BlockError<T::EthSpec>> {
3187-
let has_missing_components = matches!(
3188-
r,
3189-
Ok((AvailabilityProcessingStatus::MissingComponents(_, _), _))
3190-
);
3218+
r: Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>>,
3219+
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
3220+
let has_missing_components =
3221+
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
31913222
if !has_missing_components {
31923223
self.reqresp_pre_import_cache.write().remove(block_root);
31933224
}
@@ -3436,13 +3467,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
34363467
async fn check_gossip_data_columns_availability_and_import(
34373468
self: &Arc<Self>,
34383469
data_columns: Vec<GossipVerifiedDataColumn<T>>,
3439-
) -> Result<
3440-
(
3441-
AvailabilityProcessingStatus,
3442-
DataColumnsToPublish<T::EthSpec>,
3443-
),
3444-
BlockError<T::EthSpec>,
3445-
> {
3470+
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
34463471
if let Some(slasher) = self.slasher.as_ref() {
34473472
for data_colum in &data_columns {
34483473
slasher.accept_block_header(data_colum.signed_block_header());
@@ -3455,13 +3480,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
34553480
));
34563481
};
34573482

3458-
let (availability, data_columns_to_publish) = self
3483+
let availability = self
34593484
.data_availability_checker
34603485
.put_gossip_data_columns(data_columns)?;
34613486

3462-
self.process_availability(slot, availability)
3463-
.await
3464-
.map(|result| (result, data_columns_to_publish))
3487+
self.process_availability(slot, availability).await
34653488
}
34663489

34673490
/// Checks if the provided blobs can make any cached blocks available, and imports immediately
@@ -3509,13 +3532,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
35093532
slot: Slot,
35103533
block_root: Hash256,
35113534
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
3512-
) -> Result<
3513-
(
3514-
AvailabilityProcessingStatus,
3515-
DataColumnsToPublish<T::EthSpec>,
3516-
),
3517-
BlockError<T::EthSpec>,
3518-
> {
3535+
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
35193536
// Need to scope this to ensure the lock is dropped before calling `process_availability`
35203537
// Even an explicit drop is not enough to convince the borrow checker.
35213538
{
@@ -3539,13 +3556,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
35393556
}
35403557
}
35413558
}
3542-
let (availability, data_columns_to_publish) = self
3559+
let availability = self
35433560
.data_availability_checker
35443561
.put_rpc_custody_columns(block_root, custody_columns)?;
35453562

3546-
self.process_availability(slot, availability)
3547-
.await
3548-
.map(|result| (result, data_columns_to_publish))
3563+
self.process_availability(slot, availability).await
35493564
}
35503565

35513566
/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`

beacon_node/beacon_chain/src/block_verification_types.rs

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::block_verification::BlockError;
33
use crate::data_availability_checker::AvailabilityCheckError;
44
pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock};
55
use crate::data_column_verification::{
6-
CustodyDataColumn, CustodyDataColumnList, GossipDataColumnError, GossipVerifiedDataColumnList,
6+
CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumnList,
77
};
88
use crate::eth1_finalization_cache::Eth1FinalizationData;
99
use crate::{get_block_root, GossipVerifiedBlock, PayloadVerificationOutcome};
@@ -15,8 +15,8 @@ use std::sync::Arc;
1515
use types::blob_sidecar::{self, BlobIdentifier, FixedBlobSidecarList};
1616
use types::data_column_sidecar::{self};
1717
use types::{
18-
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
19-
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
18+
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256,
19+
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
2020
};
2121

2222
/// A block that has been received over RPC. It has 2 internal variants:
@@ -74,7 +74,7 @@ impl<E: EthSpec> RpcBlock<E> {
7474
}
7575
}
7676

77-
pub fn custody_columns(&self) -> Option<&CustodyDataColumnList<E>> {
77+
pub fn custody_columns(&self) -> Option<&Vec<CustodyDataColumn<E>>> {
7878
match &self.block {
7979
RpcBlockInner::Block(_) => None,
8080
RpcBlockInner::BlockAndBlobs(_, _) => None,
@@ -96,7 +96,7 @@ enum RpcBlockInner<E: EthSpec> {
9696
BlockAndBlobs(Arc<SignedBeaconBlock<E>>, BlobSidecarList<E>),
9797
/// This variant is used with parent lookups and by-range responses. It should have all
9898
/// requested data columns, all block roots matching for this block.
99-
BlockAndCustodyColumns(Arc<SignedBeaconBlock<E>>, CustodyDataColumnList<E>),
99+
BlockAndCustodyColumns(Arc<SignedBeaconBlock<E>>, Vec<CustodyDataColumn<E>>),
100100
}
101101

102102
impl<E: EthSpec> RpcBlock<E> {
@@ -158,7 +158,6 @@ impl<E: EthSpec> RpcBlock<E> {
158158
block_root: Option<Hash256>,
159159
block: Arc<SignedBeaconBlock<E>>,
160160
custody_columns: Vec<CustodyDataColumn<E>>,
161-
spec: &ChainSpec,
162161
) -> Result<Self, AvailabilityCheckError> {
163162
let block_root = block_root.unwrap_or_else(|| get_block_root(&block));
164163

@@ -168,10 +167,7 @@ impl<E: EthSpec> RpcBlock<E> {
168167
}
169168
// Treat empty data column lists as if they are missing.
170169
let inner = if !custody_columns.is_empty() {
171-
RpcBlockInner::BlockAndCustodyColumns(
172-
block,
173-
RuntimeVariableList::new(custody_columns, spec.number_of_columns)?,
174-
)
170+
RpcBlockInner::BlockAndCustodyColumns(block, custody_columns)
175171
} else {
176172
RpcBlockInner::Block(block)
177173
};
@@ -205,7 +201,7 @@ impl<E: EthSpec> RpcBlock<E> {
205201
Hash256,
206202
Arc<SignedBeaconBlock<E>>,
207203
Option<BlobSidecarList<E>>,
208-
Option<CustodyDataColumnList<E>>,
204+
Option<Vec<CustodyDataColumn<E>>>,
209205
) {
210206
let block_root = self.block_root();
211207
match self.block {
@@ -596,7 +592,6 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
596592
}
597593

598594
fn into_rpc_block(self) -> RpcBlock<E> {
599-
let number_of_columns = self.spec.number_of_columns;
600595
let (block_root, block, blobs_opt, data_columns_opt) = self.deconstruct();
601596
// Circumvent the constructor here, because an Available block will have already had
602597
// consistency checks performed.
@@ -605,18 +600,14 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
605600
(Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs),
606601
(_, Some(data_columns)) => RpcBlockInner::BlockAndCustodyColumns(
607602
block,
608-
RuntimeVariableList::new(
609-
data_columns
610-
.into_iter()
611-
// TODO(das): This is an ugly hack that should be removed. After updating
612-
// store types to handle custody data columns this should not be required.
613-
// It's okay-ish because available blocks must have all the required custody
614-
// columns.
615-
.map(|d| CustodyDataColumn::from_asserted_custody(d))
616-
.collect(),
617-
number_of_columns,
618-
)
619-
.expect("data column list is within bounds"),
603+
// TODO(das): This is an ugly hack that should be removed. After updating
604+
// store types to handle custody data columns this should not be required.
605+
// It's okay-ish because available blocks must have all the required custody
606+
// columns.
607+
data_columns
608+
.into_iter()
609+
.map(CustodyDataColumn::from_asserted_custody)
610+
.collect(),
620611
),
621612
};
622613
RpcBlock {

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg
22
use crate::block_verification_types::{
33
AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock,
44
};
5-
use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache;
5+
use crate::data_availability_checker::overflow_lru_cache::{
6+
AvailabilityAndReconstructedColumns, OverflowLRUCache,
7+
};
68
use crate::{BeaconChain, BeaconChainTypes, BeaconStore};
79
use kzg::Kzg;
810
use slog::{debug, error, o, Logger};
@@ -23,6 +25,7 @@ mod error;
2325
mod overflow_lru_cache;
2426
mod state_lru_cache;
2527

28+
use crate::data_availability_checker::error::Error;
2629
use crate::data_column_verification::{
2730
verify_kzg_for_data_column_list, CustodyDataColumn, GossipVerifiedDataColumn,
2831
KzgVerifiedCustodyDataColumn,
@@ -31,8 +34,6 @@ pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCh
3134
use types::data_column_sidecar::DataColumnIdentifier;
3235
use types::non_zero_usize::new_non_zero_usize;
3336

34-
pub use self::overflow_lru_cache::DataColumnsToPublish;
35-
3637
/// The LRU Cache stores `PendingComponents` which can store up to
3738
/// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So
3839
/// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this
@@ -159,6 +160,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
159160
self.availability_cache.peek_data_column(data_column_id)
160161
}
161162

163+
pub fn reconstruct_data_columns(
164+
&self,
165+
block_root: Hash256,
166+
) -> Result<Option<AvailabilityAndReconstructedColumns<T::EthSpec>>, Error> {
167+
let Some(kzg) = self.kzg.as_ref() else {
168+
return Err(AvailabilityCheckError::KzgNotInitialized);
169+
};
170+
self.availability_cache
171+
.reconstruct_data_columns(kzg, block_root)
172+
}
173+
162174
/// Put a list of blobs received via RPC into the availability cache. This performs KZG
163175
/// verification on the blobs in the list.
164176
pub fn put_rpc_blobs(
@@ -190,8 +202,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
190202
&self,
191203
block_root: Hash256,
192204
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
193-
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
194-
{
205+
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
195206
let Some(kzg) = self.kzg.as_ref() else {
196207
return Err(AvailabilityCheckError::KzgNotInitialized);
197208
};
@@ -203,11 +214,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
203214
.map(|c| KzgVerifiedCustodyDataColumn::new(c, kzg))
204215
.collect::<Result<Vec<_>, _>>()?;
205216

206-
self.availability_cache.put_kzg_verified_data_columns(
207-
kzg,
208-
block_root,
209-
verified_custody_columns,
210-
)
217+
self.availability_cache
218+
.put_kzg_verified_data_columns(block_root, verified_custody_columns)
211219
}
212220

213221
/// Check if we've cached other blobs for this block. If it completes a set and we also
@@ -232,11 +240,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
232240
pub fn put_gossip_data_columns(
233241
&self,
234242
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
235-
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
236-
{
237-
let Some(kzg) = self.kzg.as_ref() else {
238-
return Err(AvailabilityCheckError::KzgNotInitialized);
239-
};
243+
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
240244
let block_root = gossip_data_columns
241245
.first()
242246
.ok_or(AvailabilityCheckError::MissingCustodyColumns)?
@@ -248,7 +252,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
248252
.collect::<Vec<_>>();
249253

250254
self.availability_cache
251-
.put_kzg_verified_data_columns(kzg, block_root, custody_columns)
255+
.put_kzg_verified_data_columns(block_root, custody_columns)
252256
}
253257

254258
/// Check if we have all the blobs for a block. Returns `Availability` which has information
@@ -314,12 +318,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
314318
block,
315319
blobs: None,
316320
blobs_available_timestamp: None,
317-
data_columns: Some(
318-
data_column_list
319-
.into_iter()
320-
.map(|d| d.clone_arc())
321-
.collect(),
322-
),
321+
data_columns: Some(data_column_list.iter().map(|d| d.clone_arc()).collect()),
323322
spec: self.spec.clone(),
324323
}))
325324
} else {

0 commit comments

Comments
 (0)