@@ -31,6 +31,7 @@ use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
31
31
use crate :: eth1_finalization_cache:: { Eth1FinalizationCache , Eth1FinalizationData } ;
32
32
use crate :: events:: ServerSentEventHandler ;
33
33
use crate :: execution_payload:: { get_execution_payload, NotifyExecutionLayer , PreparePayloadHandle } ;
34
+ use crate :: fetch_blobs:: EngineGetBlobsOutput ;
34
35
use crate :: fork_choice_signal:: { ForkChoiceSignalRx , ForkChoiceSignalTx , ForkChoiceWaitResult } ;
35
36
use crate :: graffiti_calculator:: GraffitiCalculator ;
36
37
use crate :: kzg_utils:: reconstruct_blobs;
@@ -121,7 +122,6 @@ use store::{
121
122
KeyValueStore , KeyValueStoreOp , StoreItem , StoreOp ,
122
123
} ;
123
124
use task_executor:: { ShutdownReason , TaskExecutor } ;
124
- use tokio:: sync:: oneshot;
125
125
use tokio_stream:: Stream ;
126
126
use tracing:: { debug, error, info, trace, warn} ;
127
127
use tree_hash:: TreeHash ;
@@ -3137,16 +3137,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3137
3137
}
3138
3138
3139
3139
/// Process blobs retrieved from the EL and returns the `AvailabilityProcessingStatus`.
3140
- ///
3141
- /// `data_column_recv`: An optional receiver for `DataColumnSidecarList`.
3142
- /// If PeerDAS is enabled, this receiver will be provided and used to send
3143
- /// the `DataColumnSidecar`s once they have been successfully computed.
3144
3140
pub async fn process_engine_blobs (
3145
3141
self : & Arc < Self > ,
3146
3142
slot : Slot ,
3147
3143
block_root : Hash256 ,
3148
- blobs : FixedBlobSidecarList < T :: EthSpec > ,
3149
- data_column_recv : Option < oneshot:: Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
3144
+ engine_get_blobs_output : EngineGetBlobsOutput < T :: EthSpec > ,
3150
3145
) -> Result < AvailabilityProcessingStatus , BlockError > {
3151
3146
// If this block has already been imported to forkchoice it must have been available, so
3152
3147
// we don't need to process its blobs again.
@@ -3160,15 +3155,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3160
3155
3161
3156
// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
3162
3157
// consumers don't expect the blobs event to fire erratically.
3163
- if !self
3164
- . spec
3165
- . is_peer_das_enabled_for_epoch ( slot. epoch ( T :: EthSpec :: slots_per_epoch ( ) ) )
3166
- {
3158
+ if let EngineGetBlobsOutput :: Blobs ( blobs) = & engine_get_blobs_output {
3167
3159
self . emit_sse_blob_sidecar_events ( & block_root, blobs. iter ( ) . flatten ( ) . map ( Arc :: as_ref) ) ;
3168
3160
}
3169
3161
3170
3162
let r = self
3171
- . check_engine_blob_availability_and_import ( slot, block_root, blobs , data_column_recv )
3163
+ . check_engine_blobs_availability_and_import ( slot, block_root, engine_get_blobs_output )
3172
3164
. await ;
3173
3165
self . remove_notified ( & block_root, r)
3174
3166
}
@@ -3618,20 +3610,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3618
3610
. await
3619
3611
}
3620
3612
3621
- async fn check_engine_blob_availability_and_import (
3613
+ async fn check_engine_blobs_availability_and_import (
3622
3614
self : & Arc < Self > ,
3623
3615
slot : Slot ,
3624
3616
block_root : Hash256 ,
3625
- blobs : FixedBlobSidecarList < T :: EthSpec > ,
3626
- data_column_recv : Option < oneshot:: Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
3617
+ engine_get_blobs_output : EngineGetBlobsOutput < T :: EthSpec > ,
3627
3618
) -> Result < AvailabilityProcessingStatus , BlockError > {
3628
- self . check_blobs_for_slashability ( block_root, & blobs) ?;
3629
- let availability = self . data_availability_checker . put_engine_blobs (
3630
- block_root,
3631
- slot. epoch ( T :: EthSpec :: slots_per_epoch ( ) ) ,
3632
- blobs,
3633
- data_column_recv,
3634
- ) ?;
3619
+ let availability = match engine_get_blobs_output {
3620
+ EngineGetBlobsOutput :: Blobs ( blobs) => {
3621
+ self . check_blobs_for_slashability ( block_root, & blobs) ?;
3622
+ self . data_availability_checker
3623
+ . put_engine_blobs ( block_root, blobs) ?
3624
+ }
3625
+ EngineGetBlobsOutput :: CustodyColumns ( data_columns) => {
3626
+ self . check_columns_for_slashability ( block_root, & data_columns) ?;
3627
+ self . data_availability_checker
3628
+ . put_engine_data_columns ( block_root, data_columns) ?
3629
+ }
3630
+ } ;
3635
3631
3636
3632
self . process_availability ( slot, availability, || Ok ( ( ) ) )
3637
3633
. await
@@ -3645,27 +3641,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3645
3641
block_root : Hash256 ,
3646
3642
custody_columns : DataColumnSidecarList < T :: EthSpec > ,
3647
3643
) -> Result < AvailabilityProcessingStatus , BlockError > {
3648
- // Need to scope this to ensure the lock is dropped before calling `process_availability`
3649
- // Even an explicit drop is not enough to convince the borrow checker.
3650
- {
3651
- let mut slashable_cache = self . observed_slashable . write ( ) ;
3652
- // Assumes all items in custody_columns are for the same block_root
3653
- if let Some ( column) = custody_columns. first ( ) {
3654
- let header = & column. signed_block_header ;
3655
- if verify_header_signature :: < T , BlockError > ( self , header) . is_ok ( ) {
3656
- slashable_cache
3657
- . observe_slashable (
3658
- header. message . slot ,
3659
- header. message . proposer_index ,
3660
- block_root,
3661
- )
3662
- . map_err ( |e| BlockError :: BeaconChainError ( e. into ( ) ) ) ?;
3663
- if let Some ( slasher) = self . slasher . as_ref ( ) {
3664
- slasher. accept_block_header ( header. clone ( ) ) ;
3665
- }
3666
- }
3667
- }
3668
- }
3644
+ self . check_columns_for_slashability ( block_root, & custody_columns) ?;
3669
3645
3670
3646
// This slot value is purely informative for the consumers of
3671
3647
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
@@ -3677,6 +3653,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3677
3653
. await
3678
3654
}
3679
3655
3656
+ fn check_columns_for_slashability (
3657
+ self : & Arc < Self > ,
3658
+ block_root : Hash256 ,
3659
+ custody_columns : & DataColumnSidecarList < T :: EthSpec > ,
3660
+ ) -> Result < ( ) , BlockError > {
3661
+ let mut slashable_cache = self . observed_slashable . write ( ) ;
3662
+ // Assumes all items in custody_columns are for the same block_root
3663
+ if let Some ( column) = custody_columns. first ( ) {
3664
+ let header = & column. signed_block_header ;
3665
+ if verify_header_signature :: < T , BlockError > ( self , header) . is_ok ( ) {
3666
+ slashable_cache
3667
+ . observe_slashable (
3668
+ header. message . slot ,
3669
+ header. message . proposer_index ,
3670
+ block_root,
3671
+ )
3672
+ . map_err ( |e| BlockError :: BeaconChainError ( e. into ( ) ) ) ?;
3673
+ if let Some ( slasher) = self . slasher . as_ref ( ) {
3674
+ slasher. accept_block_header ( header. clone ( ) ) ;
3675
+ }
3676
+ }
3677
+ }
3678
+ Ok ( ( ) )
3679
+ }
3680
+
3680
3681
/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
3681
3682
///
3682
3683
/// An error is returned if the block was unable to be imported. It may be partially imported
@@ -5798,15 +5799,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
5798
5799
let kzg_proofs = Vec :: from ( proofs) ;
5799
5800
5800
5801
let kzg = self . kzg . as_ref ( ) ;
5801
-
5802
- // TODO(fulu): we no longer need blob proofs from PeerDAS and could avoid computing.
5803
- kzg_utils:: validate_blobs :: < T :: EthSpec > (
5804
- kzg,
5805
- expected_kzg_commitments,
5806
- blobs. iter ( ) . collect ( ) ,
5807
- & kzg_proofs,
5808
- )
5809
- . map_err ( BlockProductionError :: KzgError ) ?;
5802
+ if self
5803
+ . spec
5804
+ . is_peer_das_enabled_for_epoch ( slot. epoch ( T :: EthSpec :: slots_per_epoch ( ) ) )
5805
+ {
5806
+ kzg_utils:: validate_blobs_and_cell_proofs :: < T :: EthSpec > (
5807
+ kzg,
5808
+ blobs. iter ( ) . collect ( ) ,
5809
+ & kzg_proofs,
5810
+ expected_kzg_commitments,
5811
+ )
5812
+ . map_err ( BlockProductionError :: KzgError ) ?;
5813
+ } else {
5814
+ kzg_utils:: validate_blobs :: < T :: EthSpec > (
5815
+ kzg,
5816
+ expected_kzg_commitments,
5817
+ blobs. iter ( ) . collect ( ) ,
5818
+ & kzg_proofs,
5819
+ )
5820
+ . map_err ( BlockProductionError :: KzgError ) ?;
5821
+ }
5810
5822
5811
5823
Some ( ( kzg_proofs. into ( ) , blobs) )
5812
5824
}
@@ -7118,27 +7130,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
7118
7130
) ;
7119
7131
Ok ( Some ( StoreOp :: PutDataColumns ( block_root, data_columns) ) )
7120
7132
}
7121
- AvailableBlockData :: DataColumnsRecv ( data_column_recv) => {
7122
- // Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
7123
- let _column_recv_timer =
7124
- metrics:: start_timer ( & metrics:: BLOCK_PROCESSING_DATA_COLUMNS_WAIT ) ;
7125
- // Unable to receive data columns from sender, sender is either dropped or
7126
- // failed to compute data columns from blobs. We restore fork choice here and
7127
- // return to avoid inconsistency in database.
7128
- let computed_data_columns = data_column_recv
7129
- . blocking_recv ( )
7130
- . map_err ( |e| format ! ( "Did not receive data columns from sender: {e:?}" ) ) ?;
7131
- debug ! (
7132
- %block_root,
7133
- count = computed_data_columns. len( ) ,
7134
- "Writing data columns to store"
7135
- ) ;
7136
- // TODO(das): Store only this node's custody columns
7137
- Ok ( Some ( StoreOp :: PutDataColumns (
7138
- block_root,
7139
- computed_data_columns,
7140
- ) ) )
7141
- }
7142
7133
}
7143
7134
}
7144
7135
}
0 commit comments