Skip to content

Commit f9b4202

Browse files
committed
Handle sync lookup request streams in network context #5583
1 parent 5750d49 commit f9b4202

File tree

8 files changed

+621
-534
lines changed

8 files changed

+621
-534
lines changed

beacon_node/network/src/sync/block_lookups/common.rs

Lines changed: 24 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE;
22
use crate::sync::block_lookups::single_block_lookup::{
3-
LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State,
3+
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
44
};
55
use crate::sync::block_lookups::{
66
BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
77
};
88
use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId};
9-
use crate::sync::network_context::SyncNetworkContext;
9+
use crate::sync::network_context::{
10+
BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, SyncNetworkContext,
11+
};
1012
use beacon_chain::block_verification_types::RpcBlock;
1113
use beacon_chain::data_availability_checker::ChildComponents;
12-
use beacon_chain::{get_block_root, BeaconChainTypes};
13-
use lighthouse_network::rpc::methods::BlobsByRootRequest;
14-
use lighthouse_network::rpc::BlocksByRootRequest;
15-
use std::ops::IndexMut;
14+
use beacon_chain::BeaconChainTypes;
1615
use std::sync::Arc;
1716
use std::time::Duration;
18-
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
19-
use types::{BlobSidecar, ChainSpec, Hash256, SignedBeaconBlock};
17+
use types::blob_sidecar::FixedBlobSidecarList;
18+
use types::{Hash256, SignedBeaconBlock};
2019

2120
#[derive(Debug, Copy, Clone)]
2221
pub enum ResponseType {
@@ -73,9 +72,6 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
7372
/// The type of the request .
7473
type RequestType;
7574

76-
/// A block or blob response.
77-
type ResponseType;
78-
7975
/// The type created after validation.
8076
type VerifiedResponseType: Clone;
8177

@@ -85,30 +81,27 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
8581
/* Request building methods */
8682

8783
/// Construct a new request.
88-
fn build_request(
89-
&mut self,
90-
spec: &ChainSpec,
91-
) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
84+
fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
9285
// Verify and construct request.
9386
self.too_many_attempts()?;
9487
let peer = self.get_peer()?;
95-
let request = self.new_request(spec);
88+
let request = self.new_request();
9689
Ok((peer, request))
9790
}
9891

9992
/// Construct a new request and send it.
10093
fn build_request_and_send(
10194
&mut self,
10295
id: Id,
103-
cx: &SyncNetworkContext<T>,
96+
cx: &mut SyncNetworkContext<T>,
10497
) -> Result<(), LookupRequestError> {
10598
// Check if request is necessary.
10699
if !self.get_state().is_awaiting_download() {
107100
return Ok(());
108101
}
109102

110103
// Construct request.
111-
let (peer_id, request) = self.build_request(&cx.chain.spec)?;
104+
let (peer_id, request) = self.build_request()?;
112105

113106
// Update request state.
114107
let req_counter = self.get_state_mut().on_download_start(peer_id);
@@ -144,61 +137,18 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
144137
}
145138

146139
/// Initialize `Self::RequestType`.
147-
fn new_request(&self, spec: &ChainSpec) -> Self::RequestType;
140+
fn new_request(&self) -> Self::RequestType;
148141

149142
/// Send the request to the network service.
150143
fn make_request(
151144
id: SingleLookupReqId,
152145
peer_id: PeerId,
153146
request: Self::RequestType,
154-
cx: &SyncNetworkContext<T>,
147+
cx: &mut SyncNetworkContext<T>,
155148
) -> Result<(), LookupRequestError>;
156149

157150
/* Response handling methods */
158151

159-
/// Verify the response is valid based on what we requested.
160-
fn verify_response(
161-
&mut self,
162-
expected_block_root: Hash256,
163-
peer_id: PeerId,
164-
response: Option<Self::ResponseType>,
165-
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError> {
166-
let result = match *self.get_state().get_state() {
167-
State::AwaitingDownload => Err(LookupVerifyError::ExtraBlocksReturned),
168-
State::Downloading { peer_id: _ } => {
169-
// TODO: We requested a download from Downloading { peer_id }, but the network
170-
// injects a response from a different peer_id. What should we do? The peer_id to
171-
// track for scoring is the one that actually sent the response, not the state's
172-
self.verify_response_inner(expected_block_root, response)
173-
}
174-
State::Processing { .. } | State::Processed { .. } => match response {
175-
// We sent the block for processing and received an extra block.
176-
Some(_) => Err(LookupVerifyError::ExtraBlocksReturned),
177-
// This is simply the stream termination and we are already processing the block
178-
None => Ok(None),
179-
},
180-
};
181-
182-
match result {
183-
Ok(Some(response)) => {
184-
self.get_state_mut().on_download_success(peer_id);
185-
Ok(Some(response))
186-
}
187-
Ok(None) => Ok(None),
188-
Err(e) => {
189-
self.get_state_mut().on_download_failure();
190-
Err(e)
191-
}
192-
}
193-
}
194-
195-
/// The response verification unique to block or blobs.
196-
fn verify_response_inner(
197-
&mut self,
198-
expected_block_root: Hash256,
199-
response: Option<Self::ResponseType>,
200-
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError>;
201-
202152
/// A getter for the parent root of the response. Returns an `Option` because we won't know
203153
/// the blob parent if we don't end up getting any blobs in the response.
204154
fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option<Hash256>;
@@ -247,49 +197,24 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
247197
}
248198

249199
impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L> {
250-
type RequestType = BlocksByRootRequest;
251-
type ResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
200+
type RequestType = BlocksByRootSingleRequest;
252201
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
253202
type ReconstructedResponseType = RpcBlock<T::EthSpec>;
254203

255-
fn new_request(&self, spec: &ChainSpec) -> BlocksByRootRequest {
256-
BlocksByRootRequest::new(vec![self.requested_block_root], spec)
204+
fn new_request(&self) -> Self::RequestType {
205+
BlocksByRootSingleRequest(self.requested_block_root)
257206
}
258207

259208
fn make_request(
260209
id: SingleLookupReqId,
261210
peer_id: PeerId,
262211
request: Self::RequestType,
263-
cx: &SyncNetworkContext<T>,
212+
cx: &mut SyncNetworkContext<T>,
264213
) -> Result<(), LookupRequestError> {
265214
cx.block_lookup_request(id, peer_id, request)
266215
.map_err(LookupRequestError::SendFailed)
267216
}
268217

269-
fn verify_response_inner(
270-
&mut self,
271-
expected_block_root: Hash256,
272-
response: Option<Self::ResponseType>,
273-
) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, LookupVerifyError> {
274-
match response {
275-
Some(block) => {
276-
// Compute the block root using this specific function so that we can get timing
277-
// metrics.
278-
let block_root = get_block_root(&block);
279-
if block_root != expected_block_root {
280-
// return an error and drop the block
281-
// NOTE: we take this is as a download failure to prevent counting the
282-
// attempt as a chain failure, but simply a peer failure.
283-
Err(LookupVerifyError::RootMismatch)
284-
} else {
285-
// Return the block for processing.
286-
Ok(Some(block))
287-
}
288-
}
289-
None => Err(LookupVerifyError::NoBlockReturned),
290-
}
291-
}
292-
293218
fn get_parent_root(verified_response: &Arc<SignedBeaconBlock<T::EthSpec>>) -> Option<Hash256> {
294219
Some(verified_response.parent_root())
295220
}
@@ -340,60 +265,27 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
340265
}
341266

342267
impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L, T::EthSpec> {
343-
type RequestType = BlobsByRootRequest;
344-
type ResponseType = Arc<BlobSidecar<T::EthSpec>>;
268+
type RequestType = BlobsByRootSingleBlockRequest;
345269
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
346270
type ReconstructedResponseType = FixedBlobSidecarList<T::EthSpec>;
347271

348-
fn new_request(&self, spec: &ChainSpec) -> BlobsByRootRequest {
349-
let blob_id_vec: Vec<BlobIdentifier> = self.requested_ids.clone().into();
350-
BlobsByRootRequest::new(blob_id_vec, spec)
272+
fn new_request(&self) -> Self::RequestType {
273+
BlobsByRootSingleBlockRequest {
274+
block_root: self.block_root,
275+
indices: self.requested_ids.indices(),
276+
}
351277
}
352278

353279
fn make_request(
354280
id: SingleLookupReqId,
355281
peer_id: PeerId,
356282
request: Self::RequestType,
357-
cx: &SyncNetworkContext<T>,
283+
cx: &mut SyncNetworkContext<T>,
358284
) -> Result<(), LookupRequestError> {
359285
cx.blob_lookup_request(id, peer_id, request)
360286
.map_err(LookupRequestError::SendFailed)
361287
}
362288

363-
fn verify_response_inner(
364-
&mut self,
365-
expected_block_root: Hash256,
366-
blob: Option<Self::ResponseType>,
367-
) -> Result<Option<FixedBlobSidecarList<T::EthSpec>>, LookupVerifyError> {
368-
match blob {
369-
Some(blob) => {
370-
let received_id = blob.id();
371-
372-
if !self.requested_ids.contains(&received_id) {
373-
return Err(LookupVerifyError::UnrequestedBlobId(received_id));
374-
}
375-
if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) {
376-
return Err(LookupVerifyError::InvalidInclusionProof);
377-
}
378-
if blob.block_root() != expected_block_root {
379-
return Err(LookupVerifyError::UnrequestedHeader);
380-
}
381-
382-
// State should remain downloading until we receive the stream terminator.
383-
self.requested_ids.remove(&received_id);
384-
385-
// The inclusion proof check above ensures `blob.index` is < MAX_BLOBS_PER_BLOCK
386-
let blob_index = blob.index;
387-
*self.blob_download_queue.index_mut(blob_index as usize) = Some(blob);
388-
Ok(None)
389-
}
390-
None => {
391-
let blobs = std::mem::take(&mut self.blob_download_queue);
392-
Ok(Some(blobs))
393-
}
394-
}
395-
}
396-
397289
fn get_parent_root(verified_response: &FixedBlobSidecarList<T::EthSpec>) -> Option<Hash256> {
398290
verified_response
399291
.into_iter()

0 commit comments

Comments
 (0)