Skip to content

Commit 6fb0b2e

Browse files
authored
Sync lookup dedup range and blobs (#5561)
* Handle sync range blocks as blocks and blobs * Merge range sync and backfill sync handling * Update tests * Add no_blobs_into_responses test * Address @realbigsean comments * Merge remote-tracking branch 'origin/unstable' into sync-lookup-dedup-range-and-blobs
1 parent 5fdd3b3 commit 6fb0b2e

File tree

7 files changed

+256
-531
lines changed

7 files changed

+256
-531
lines changed

beacon_node/network/src/router.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -497,10 +497,7 @@ impl<T: BeaconChainTypes> Router<T> {
497497
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
498498
return;
499499
}
500-
id @ (SyncId::BackFillBlocks { .. }
501-
| SyncId::RangeBlocks { .. }
502-
| SyncId::BackFillBlockAndBlobs { .. }
503-
| SyncId::RangeBlockAndBlobs { .. }) => id,
500+
id @ SyncId::RangeBlockAndBlobs { .. } => id,
504501
},
505502
RequestId::Router => {
506503
crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id);
@@ -559,10 +556,7 @@ impl<T: BeaconChainTypes> Router<T> {
559556
let request_id = match request_id {
560557
RequestId::Sync(sync_id) => match sync_id {
561558
id @ SyncId::SingleBlock { .. } => id,
562-
SyncId::BackFillBlocks { .. }
563-
| SyncId::RangeBlocks { .. }
564-
| SyncId::RangeBlockAndBlobs { .. }
565-
| SyncId::BackFillBlockAndBlobs { .. } => {
559+
SyncId::RangeBlockAndBlobs { .. } => {
566560
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
567561
return;
568562
}
@@ -604,10 +598,7 @@ impl<T: BeaconChainTypes> Router<T> {
604598
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
605599
return;
606600
}
607-
SyncId::BackFillBlocks { .. }
608-
| SyncId::RangeBlocks { .. }
609-
| SyncId::RangeBlockAndBlobs { .. }
610-
| SyncId::BackFillBlockAndBlobs { .. } => {
601+
SyncId::RangeBlockAndBlobs { .. } => {
611602
crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id);
612603
return;
613604
}

beacon_node/network/src/sync/backfill_sync/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
1111
use crate::network_beacon_processor::ChainSegmentProcessId;
1212
use crate::sync::manager::{BatchProcessResult, Id};
13+
use crate::sync::network_context::RangeRequestId;
1314
use crate::sync::network_context::SyncNetworkContext;
1415
use crate::sync::range_sync::{
1516
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
@@ -961,7 +962,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
961962
) -> Result<(), BackFillError> {
962963
if let Some(batch) = self.batches.get_mut(&batch_id) {
963964
let (request, is_blob_batch) = batch.to_blocks_by_range_request();
964-
match network.backfill_blocks_by_range_request(peer, is_blob_batch, request, batch_id) {
965+
match network.blocks_and_blobs_by_range_request(
966+
peer,
967+
is_blob_batch,
968+
request,
969+
RangeRequestId::BackfillSync { batch_id },
970+
) {
965971
Ok(request_id) => {
966972
// inform the batch about the new request
967973
if let Err(e) = batch.start_downloading_from_peer(peer, request_id) {

beacon_node/network/src/sync/block_sidecar_coupling.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use ssz_types::VariableList;
33
use std::{collections::VecDeque, sync::Arc};
44
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
55

6-
#[derive(Debug, Default)]
6+
use super::range_sync::ByRangeRequestType;
7+
8+
#[derive(Debug)]
79
pub struct BlocksAndBlobsRequestInfo<E: EthSpec> {
810
/// Blocks we have received awaiting for their corresponding sidecar.
911
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<E>>>,
@@ -13,9 +15,25 @@ pub struct BlocksAndBlobsRequestInfo<E: EthSpec> {
1315
is_blocks_stream_terminated: bool,
1416
/// Whether the individual RPC request for sidecars is finished or not.
1517
is_sidecars_stream_terminated: bool,
18+
/// Used to determine if this accumulator should wait for a sidecars stream termination
19+
request_type: ByRangeRequestType,
1620
}
1721

1822
impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
23+
pub fn new(request_type: ByRangeRequestType) -> Self {
24+
Self {
25+
accumulated_blocks: <_>::default(),
26+
accumulated_sidecars: <_>::default(),
27+
is_blocks_stream_terminated: <_>::default(),
28+
is_sidecars_stream_terminated: <_>::default(),
29+
request_type,
30+
}
31+
}
32+
33+
pub fn get_request_type(&self) -> ByRangeRequestType {
34+
self.request_type
35+
}
36+
1937
pub fn add_block_response(&mut self, block_opt: Option<Arc<SignedBeaconBlock<E>>>) {
2038
match block_opt {
2139
Some(block) => self.accumulated_blocks.push_back(block),
@@ -78,6 +96,38 @@ impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
7896
}
7997

8098
pub fn is_finished(&self) -> bool {
81-
self.is_blocks_stream_terminated && self.is_sidecars_stream_terminated
99+
let blobs_requested = match self.request_type {
100+
ByRangeRequestType::Blocks => false,
101+
ByRangeRequestType::BlocksAndBlobs => true,
102+
};
103+
self.is_blocks_stream_terminated && (!blobs_requested || self.is_sidecars_stream_terminated)
104+
}
105+
}
106+
107+
#[cfg(test)]
108+
mod tests {
109+
use super::BlocksAndBlobsRequestInfo;
110+
use crate::sync::range_sync::ByRangeRequestType;
111+
use beacon_chain::test_utils::{generate_rand_block_and_blobs, NumBlobs};
112+
use rand::SeedableRng;
113+
use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E};
114+
115+
#[test]
116+
fn no_blobs_into_responses() {
117+
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::Blocks);
118+
let mut rng = XorShiftRng::from_seed([42; 16]);
119+
let blocks = (0..4)
120+
.map(|_| generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng).0)
121+
.collect::<Vec<_>>();
122+
123+
// Send blocks and complete terminate response
124+
for block in blocks {
125+
info.add_block_response(Some(block.into()));
126+
}
127+
info.add_block_response(None);
128+
129+
// Assert response is finished and RpcBlocks can be constructed
130+
assert!(info.is_finished());
131+
info.into_responses().unwrap();
82132
}
83133
}

0 commit comments

Comments
 (0)