Skip to content

Commit 828794d

Browse files
committed
Data availability sampling on sync
1 parent f9b4202 commit 828794d

File tree

15 files changed

+1225
-77
lines changed

15 files changed

+1225
-77
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2855,6 +2855,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
28552855
ChainSegmentResult::Successful { imported_blocks }
28562856
}
28572857

2858+
pub async fn process_sampling_result(
2859+
self: &Arc<Self>,
2860+
block_root: Hash256,
2861+
sampling_successful: bool,
2862+
) {
2863+
// TODO(das): update fork-choice
2864+
// TODO(das): This log levels are too high, leave to debug important events for now
2865+
if sampling_successful {
2866+
info!(self.log, "Sampling successful"; "block_root" => %block_root);
2867+
} else {
2868+
warn!(self.log, "Sampling failed"; "block_root" => %block_root);
2869+
}
2870+
}
2871+
28582872
/// Returns `Ok(GossipVerifiedBlock)` if the supplied `block` should be forwarded onto the
28592873
/// gossip network. The block is not imported into the chain, it is just partially verified.
28602874
///

beacon_node/beacon_chain/src/test_utils.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2588,3 +2588,34 @@ pub fn generate_rand_block_and_blobs<E: EthSpec>(
25882588
}
25892589
(block, blob_sidecars)
25902590
}
2591+
2592+
pub fn generate_rand_block_and_data_columns<E: EthSpec>(
2593+
fork_name: ForkName,
2594+
num_blobs: NumBlobs,
2595+
rng: &mut impl Rng,
2596+
) -> (
2597+
SignedBeaconBlock<E, FullPayload<E>>,
2598+
Vec<DataColumnSidecar<E>>,
2599+
) {
2600+
let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng);
2601+
let blob = blobs.first().expect("should have at least 1 blob");
2602+
2603+
// TODO(das): do not hardcode
2604+
let data_columns = (0..64)
2605+
.map(|index| DataColumnSidecar {
2606+
index,
2607+
column: <_>::default(),
2608+
kzg_commitments: block
2609+
.message()
2610+
.body()
2611+
.blob_kzg_commitments()
2612+
.unwrap()
2613+
.clone(),
2614+
kzg_proofs: (vec![]).into(),
2615+
signed_block_header: blob.signed_block_header.clone(),
2616+
kzg_commitments_inclusion_proof: <_>::default(),
2617+
})
2618+
.collect::<Vec<_>>();
2619+
2620+
(block, data_columns)
2621+
}

beacon_node/beacon_processor/src/lib.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
157157
/// will be stored before we start dropping them.
158158
const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024;
159159

160+
/// TODO(das): Placeholder number
161+
const MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN: usize = 1000;
162+
const MAX_SAMPLING_RESULT_QUEUE_LEN: usize = 1000;
163+
160164
/// The maximum number of queued `Vec<SignedBeaconBlock>` objects received during syncing that will
161165
/// be stored before we start dropping them.
162166
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;
@@ -252,6 +256,8 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic
252256
pub const RPC_BLOCK: &str = "rpc_block";
253257
pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block";
254258
pub const RPC_BLOBS: &str = "rpc_blob";
259+
pub const RPC_VERIFY_DATA_COLUMNS: &str = "rpc_verify_data_columns";
260+
pub const SAMPLING_RESULT: &str = "sampling_result";
255261
pub const CHAIN_SEGMENT: &str = "chain_segment";
256262
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
257263
pub const STATUS_PROCESSING: &str = "status_processing";
@@ -629,6 +635,8 @@ pub enum Work<E: EthSpec> {
629635
RpcBlobs {
630636
process_fn: AsyncFn,
631637
},
638+
RpcVerifyDataColumn(AsyncFn),
639+
SamplingResult(AsyncFn),
632640
IgnoredRpcBlock {
633641
process_fn: BlockingFn,
634642
},
@@ -675,6 +683,8 @@ impl<E: EthSpec> Work<E> {
675683
Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
676684
Work::RpcBlock { .. } => RPC_BLOCK,
677685
Work::RpcBlobs { .. } => RPC_BLOBS,
686+
Work::RpcVerifyDataColumn(_) => RPC_VERIFY_DATA_COLUMNS,
687+
Work::SamplingResult(_) => SAMPLING_RESULT,
678688
Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK,
679689
Work::ChainSegment { .. } => CHAIN_SEGMENT,
680690
Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL,
@@ -833,6 +843,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
833843
// Using a FIFO queue since blocks need to be imported sequentially.
834844
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
835845
let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN);
846+
let mut rpc_verify_data_column_queue = FifoQueue::new(MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN);
847+
let mut sampling_result_queue = FifoQueue::new(MAX_SAMPLING_RESULT_QUEUE_LEN);
836848
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
837849
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
838850
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
@@ -1278,6 +1290,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
12781290
rpc_block_queue.push(work, work_id, &self.log)
12791291
}
12801292
Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log),
1293+
Work::RpcVerifyDataColumn(_) => {
1294+
rpc_verify_data_column_queue.push(work, work_id, &self.log)
1295+
}
1296+
Work::SamplingResult(_) => {
1297+
sampling_result_queue.push(work, work_id, &self.log)
1298+
}
12811299
Work::ChainSegment { .. } => {
12821300
chain_segment_queue.push(work, work_id, &self.log)
12831301
}
@@ -1510,9 +1528,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
15101528
beacon_block_root: _,
15111529
process_fn,
15121530
} => task_spawner.spawn_async(process_fn),
1513-
Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => {
1514-
task_spawner.spawn_async(process_fn)
1515-
}
1531+
Work::RpcBlock { process_fn }
1532+
| Work::RpcBlobs { process_fn }
1533+
| Work::RpcVerifyDataColumn(process_fn)
1534+
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
15161535
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
15171536
Work::GossipBlock(work)
15181537
| Work::GossipBlobSidecar(work)

beacon_node/lighthouse_network/src/rpc/methods.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,14 @@ pub struct DataColumnsByRootRequest {
378378
pub data_column_ids: RuntimeVariableList<DataColumnIdentifier>,
379379
}
380380

381+
impl DataColumnsByRootRequest {
382+
pub fn new(blob_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
383+
let data_column_ids =
384+
RuntimeVariableList::from_vec(blob_ids, spec.max_request_data_column_sidecars as usize);
385+
Self { data_column_ids }
386+
}
387+
}
388+
381389
/* RPC Handling and Grouping */
382390
// Collection of enums and structs used by the Codecs to encode/decode RPC messages
383391

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
service::NetworkMessage,
3-
sync::{manager::BlockProcessType, SyncMessage},
3+
sync::{manager::BlockProcessType, SamplingId, SyncMessage},
44
};
55
use beacon_chain::block_verification_types::RpcBlock;
66
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
@@ -478,6 +478,43 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
478478
})
479479
}
480480

481+
/// Create a new `Work` event for some data_columns from ReqResp
482+
pub fn send_rpc_data_columns(
483+
self: &Arc<Self>,
484+
block_root: Hash256,
485+
data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
486+
seen_timestamp: Duration,
487+
id: SamplingId,
488+
) -> Result<(), Error<T::EthSpec>> {
489+
let nbp = self.clone();
490+
self.try_send(BeaconWorkEvent {
491+
drop_during_sync: false,
492+
work: Work::RpcVerifyDataColumn(Box::pin(async move {
493+
let result = nbp
494+
.clone()
495+
.validate_rpc_data_columns(block_root, data_columns, seen_timestamp)
496+
.await;
497+
// Sync handles these results
498+
nbp.send_sync_message(SyncMessage::SampleVerified { id, result });
499+
})),
500+
})
501+
}
502+
503+
pub fn send_sampling_result(
504+
self: &Arc<Self>,
505+
block_root: Hash256,
506+
sampling_result: Result<(), String>,
507+
) -> Result<(), Error<T::EthSpec>> {
508+
let nbp = self.clone();
509+
self.try_send(BeaconWorkEvent {
510+
drop_during_sync: false,
511+
work: Work::SamplingResult(Box::pin(async move {
512+
nbp.process_sampling_result(block_root, sampling_result)
513+
.await;
514+
})),
515+
})
516+
}
517+
481518
/// Create a new work event to import `blocks` as a beacon chain segment.
482519
pub fn send_chain_segment(
483520
self: &Arc<Self>,

beacon_node/network/src/network_beacon_processor/sync_methods.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use store::KzgCommitment;
2424
use tokio::sync::mpsc;
2525
use types::beacon_block_body::format_kzg_commitments;
2626
use types::blob_sidecar::FixedBlobSidecarList;
27-
use types::{Epoch, Hash256};
27+
use types::{DataColumnSidecar, Epoch, Hash256};
2828

2929
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
3030
#[derive(Clone, Debug, PartialEq)]
@@ -305,6 +305,28 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
305305
});
306306
}
307307

308+
/// Validate a list of data columns received from RPC requests
309+
pub async fn validate_rpc_data_columns(
310+
self: Arc<NetworkBeaconProcessor<T>>,
311+
_block_root: Hash256,
312+
_data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
313+
_seen_timestamp: Duration,
314+
) -> Result<(), String> {
315+
// TODO(das): validate data column sidecar KZG commitment
316+
Ok(())
317+
}
318+
319+
/// Process a sampling result, inserting it into fork-choice
320+
pub async fn process_sampling_result(
321+
self: Arc<NetworkBeaconProcessor<T>>,
322+
block_root: Hash256,
323+
sampling_result: Result<(), String>,
324+
) {
325+
self.chain
326+
.process_sampling_result(block_root, sampling_result.is_ok())
327+
.await;
328+
}
329+
308330
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
309331
/// thread if more blocks are needed to process it.
310332
pub async fn process_chain_segment(

beacon_node/network/src/router.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -514,11 +514,11 @@ impl<T: BeaconChainTypes> Router<T> {
514514
) {
515515
let request_id = match request_id {
516516
RequestId::Sync(sync_id) => match sync_id {
517-
SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } => {
518-
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
517+
id @ SyncId::RangeBlockAndBlobs { .. } => id,
518+
other => {
519+
crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other);
519520
return;
520521
}
521-
id @ SyncId::RangeBlockAndBlobs { .. } => id,
522522
},
523523
RequestId::Router => {
524524
crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id);
@@ -577,12 +577,8 @@ impl<T: BeaconChainTypes> Router<T> {
577577
let request_id = match request_id {
578578
RequestId::Sync(sync_id) => match sync_id {
579579
id @ SyncId::SingleBlock { .. } => id,
580-
SyncId::RangeBlockAndBlobs { .. } => {
581-
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
582-
return;
583-
}
584-
SyncId::SingleBlob { .. } => {
585-
crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id);
580+
other => {
581+
crit!(self.log, "BlocksByRoot response on incorrect request"; "request" => ?other);
586582
return;
587583
}
588584
},
@@ -615,12 +611,8 @@ impl<T: BeaconChainTypes> Router<T> {
615611
let request_id = match request_id {
616612
RequestId::Sync(sync_id) => match sync_id {
617613
id @ SyncId::SingleBlob { .. } => id,
618-
SyncId::SingleBlock { .. } => {
619-
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
620-
return;
621-
}
622-
SyncId::RangeBlockAndBlobs { .. } => {
623-
crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id);
614+
other => {
615+
crit!(self.log, "BlobsByRoot response on incorrect request"; "request" => ?other);
624616
return;
625617
}
626618
},

beacon_node/network/src/sync/block_lookups/mod.rs

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
641641
error: RPCError,
642642
) {
643643
// Downscore peer even if lookup is not known
644-
self.downscore_on_rpc_error(peer_id, &error, cx);
644+
cx.report_peer_on_rpc_error(peer_id, &error);
645645

646646
let Some(mut parent_lookup) = self.get_parent_lookup::<R>(id) else {
647647
debug!(self.log,
@@ -674,7 +674,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
674674
error: RPCError,
675675
) {
676676
// Downscore peer even if lookup is not known
677-
self.downscore_on_rpc_error(peer_id, &error, cx);
677+
cx.report_peer_on_rpc_error(peer_id, &error);
678678

679679
let log = self.log.clone();
680680
let Some(mut lookup) = self.get_single_lookup::<R>(id) else {
@@ -1330,34 +1330,4 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
13301330
self.parent_lookups.len() as i64,
13311331
);
13321332
}
1333-
1334-
pub fn downscore_on_rpc_error(
1335-
&self,
1336-
peer_id: &PeerId,
1337-
error: &RPCError,
1338-
cx: &SyncNetworkContext<T>,
1339-
) {
1340-
// Note: logging the report event here with the full error display. The log inside
1341-
// `report_peer` only includes a smaller string, like "invalid_data"
1342-
debug!(self.log, "reporting peer for sync lookup error"; "error" => %error);
1343-
if let Some(action) = match error {
1344-
// Protocol errors are heavily penalized
1345-
RPCError::SSZDecodeError(..)
1346-
| RPCError::IoError(..)
1347-
| RPCError::ErrorResponse(..)
1348-
| RPCError::InvalidData(..)
1349-
| RPCError::HandlerRejected => Some(PeerAction::LowToleranceError),
1350-
// Timing / network errors are less penalized
1351-
// TODO: Is IoError a protocol error or network error?
1352-
RPCError::StreamTimeout | RPCError::IncompleteStream | RPCError::NegotiationTimeout => {
1353-
Some(PeerAction::MidToleranceError)
1354-
}
1355-
// Not supporting a specific protocol is tolerated. TODO: Are you sure?
1356-
RPCError::UnsupportedProtocol => None,
1357-
// Our fault, don't penalize peer
1358-
RPCError::InternalError(..) | RPCError::Disconnected => None,
1359-
} {
1360-
cx.report_peer(*peer_id, action, error.into());
1361-
}
1362-
}
13631333
}

0 commit comments

Comments
 (0)