Skip to content

Commit 2367153

Browse files
committed
Address @jimmygchen review
1 parent 9de3178 commit 2367153

File tree

6 files changed

+107
-36
lines changed

6 files changed

+107
-36
lines changed

beacon_node/beacon_processor/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,11 @@ impl<E: EthSpec> BeaconProcessor<E> {
10001000
self.spawn_worker(item, idle_tx);
10011001
} else if let Some(item) = rpc_blob_queue.pop() {
10021002
self.spawn_worker(item, idle_tx);
1003+
// TODO(das): decide proper priorization for sampling columns
1004+
} else if let Some(item) = rpc_verify_data_column_queue.pop() {
1005+
self.spawn_worker(item, idle_tx);
1006+
} else if let Some(item) = sampling_result_queue.pop() {
1007+
self.spawn_worker(item, idle_tx);
10031008
// Check delayed blocks before gossip blocks, the gossip blocks might rely
10041009
// on the delayed ones.
10051010
} else if let Some(item) = delayed_block_queue.pop() {
@@ -1389,6 +1394,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
13891394
&metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL,
13901395
rpc_blob_queue.len() as i64,
13911396
);
1397+
metrics::set_gauge(
1398+
&metrics::BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL,
1399+
rpc_blob_queue.len() as i64,
1400+
);
1401+
metrics::set_gauge(
1402+
&metrics::BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL,
1403+
rpc_blob_queue.len() as i64,
1404+
);
13921405
metrics::set_gauge(
13931406
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
13941407
chain_segment_queue.len() as i64,

beacon_node/beacon_processor/src/metrics.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,16 @@ lazy_static::lazy_static! {
8686
"beacon_processor_rpc_blob_queue_total",
8787
"Count of blobs from the rpc waiting to be verified."
8888
);
89+
// Rpc verify data columns
90+
pub static ref BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
91+
"beacon_processor_rpc_verify_data_column_queue_total",
92+
"Count of data columns from the rpc waiting to be verified."
93+
);
94+
// Sampling result
95+
pub static ref BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
96+
"beacon_processor_rpc_blob_queue_total",
97+
"Count of sampling results waiting to be processed."
98+
);
8999
// Chain segments.
90100
pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
91101
"beacon_processor_chain_segment_queue_total",

beacon_node/lighthouse_network/src/rpc/methods.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -379,9 +379,11 @@ pub struct DataColumnsByRootRequest {
379379
}
380380

381381
impl DataColumnsByRootRequest {
382-
pub fn new(blob_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
383-
let data_column_ids =
384-
RuntimeVariableList::from_vec(blob_ids, spec.max_request_data_column_sidecars as usize);
382+
pub fn new(data_column_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
383+
let data_column_ids = RuntimeVariableList::from_vec(
384+
data_column_ids,
385+
spec.max_request_data_column_sidecars as usize,
386+
);
385387
Self { data_column_ids }
386388
}
387389
}

beacon_node/network/src/sync/manager.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ pub enum SyncMessage<E: EthSpec> {
125125
seen_timestamp: Duration,
126126
},
127127

128+
/// A data columns has been received from the RPC
128129
RpcDataColumn {
129130
request_id: RequestId,
130131
peer_id: PeerId,
@@ -1044,25 +1045,22 @@ impl<T: BeaconChainTypes> SyncManager<T> {
10441045
peer_id: PeerId,
10451046
data_column: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
10461047
) {
1047-
let Some((requester, resp)) = self
1048+
if let Some((requester, resp)) = self
10481049
.network
10491050
.on_data_columns_by_root_response(id, data_column)
1050-
else {
1051-
// TOOD(das): error o log
1052-
return;
1053-
};
1054-
1055-
match requester {
1056-
DataColumnsByRootRequester::Sampling(id) => {
1057-
if let Some(result) =
1058-
self.sampling
1059-
.on_sample_downloaded(id, peer_id, resp, &mut self.network)
1060-
{
1061-
self.on_sampling_result(id.id, result)
1051+
{
1052+
match requester {
1053+
DataColumnsByRootRequester::Sampling(id) => {
1054+
if let Some(result) =
1055+
self.sampling
1056+
.on_sample_downloaded(id, peer_id, resp, &mut self.network)
1057+
{
1058+
self.on_sampling_result(id.id, result)
1059+
}
1060+
}
1061+
DataColumnsByRootRequester::Custody => {
1062+
todo!("TODO(das): handle custody requests");
10621063
}
1063-
}
1064-
DataColumnsByRootRequester::Custody => {
1065-
todo!("TODO(das): handle custody requests");
10661064
}
10671065
}
10681066
}

beacon_node/network/src/sync/network_context.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
2020
use fnv::FnvHashMap;
2121
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
2222
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
23-
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
23+
use lighthouse_network::{
24+
Client, Eth2Enr, NetworkGlobals, PeerAction, PeerId, ReportSource, Request,
25+
};
2426
pub use requests::LookupVerifyError;
2527
use slog::{debug, trace, warn};
2628
use std::collections::hash_map::Entry;
@@ -59,7 +61,7 @@ pub enum RpcEvent<T> {
5961
RPCError(RPCError),
6062
}
6163

62-
pub type RpcProcessingResult<T, ID> = Option<(ID, Result<(T, Duration), LookupFailure>)>;
64+
pub type RpcProcessingResult<ID, T> = Option<(ID, Result<(T, Duration), LookupFailure>)>;
6365

6466
pub enum LookupFailure {
6567
RpcError(RPCError),
@@ -163,10 +165,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
163165
pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec<PeerId> {
164166
let mut peer_ids = vec![];
165167

166-
for (peer_id, peer_info) in self.network_globals().peers.read().peers() {
168+
for (peer_id, peer_info) in self.network_globals().peers.read().connected_peers() {
167169
if let Some(enr) = peer_info.enr() {
168-
// TODO(das): do not hardcode `custody_subnet_count`
169-
let custody_subnet_count = 2;
170+
// TODO(das): ignores decode errors
171+
let custody_subnet_count = enr
172+
.custody_subnet_count::<T::EthSpec>()
173+
.unwrap_or(T::EthSpec::min_custody_requirement() as u64);
170174
// TODO(das): consider caching a map of subnet -> Vec<PeerId> and invalidating
171175
// whenever a peer connected or disconnect event in received
172176
let mut subnets = DataColumnSubnetId::compute_custody_subnets::<T::EthSpec>(
@@ -551,7 +555,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
551555
&mut self,
552556
request_id: SingleLookupReqId,
553557
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
554-
) -> RpcProcessingResult<Arc<SignedBeaconBlock<T::EthSpec>>, ()> {
558+
) -> RpcProcessingResult<(), Arc<SignedBeaconBlock<T::EthSpec>>> {
555559
let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else {
556560
return None;
557561
};
@@ -583,7 +587,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
583587
&mut self,
584588
request_id: SingleLookupReqId,
585589
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
586-
) -> RpcProcessingResult<FixedBlobSidecarList<T::EthSpec>, ()> {
590+
) -> RpcProcessingResult<(), FixedBlobSidecarList<T::EthSpec>> {
587591
let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else {
588592
return None;
589593
};
@@ -620,7 +624,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
620624
&mut self,
621625
id: Id,
622626
item: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
623-
) -> RpcProcessingResult<Vec<Arc<DataColumnSidecar<T::EthSpec>>>, DataColumnsByRootRequester>
627+
) -> RpcProcessingResult<DataColumnsByRootRequester, Vec<Arc<DataColumnSidecar<T::EthSpec>>>>
624628
{
625629
let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else {
626630
return None;

beacon_node/network/src/sync/sampling.rs

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
collections::hash_map::Entry, collections::HashMap, marker::PhantomData, sync::Arc,
1111
time::Duration,
1212
};
13-
use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256, Slot};
13+
use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, EthSpec, Hash256, Slot};
1414

1515
pub type SamplingResult = Result<(), SamplingError>;
1616

@@ -48,6 +48,12 @@ impl<T: BeaconChainTypes> Sampling<T> {
4848
self.requests.values().map(|r| r.block_root).collect()
4949
}
5050

51+
/// Create a new sampling request for a known block
52+
///
53+
/// ### Returns
54+
///
55+
/// - `Some`: Request completed, won't make more progress. Expect requester to act on the result.
56+
/// - `None`: Request still active, requester should do no action
5157
pub fn on_new_sample_request(
5258
&mut self,
5359
block_root: Hash256,
@@ -81,6 +87,13 @@ impl<T: BeaconChainTypes> Sampling<T> {
8187
.map(|result| (requester, result))
8288
}
8389

90+
/// Insert a downloaded column into an active sampling request. Then make progress on the
91+
/// entire request.
92+
///
93+
/// ### Returns
94+
///
95+
/// - `Some`: Request completed, won't make more progress. Expect requester to act on the result.
96+
/// - `None`: Request still active, requester should do no action
8497
pub fn on_sample_downloaded(
8598
&mut self,
8699
id: SamplingId,
@@ -98,6 +111,13 @@ impl<T: BeaconChainTypes> Sampling<T> {
98111
self.handle_sampling_result(result, &id.id)
99112
}
100113

114+
/// Insert a downloaded column into an active sampling request. Then make progress on the
115+
/// entire request.
116+
///
117+
/// ### Returns
118+
///
119+
/// - `Some`: Request completed, won't make more progress. Expect requester to act on the result.
120+
/// - `None`: Request still active, requester should do no action
101121
pub fn on_sample_verified(
102122
&mut self,
103123
id: SamplingId,
@@ -114,14 +134,17 @@ impl<T: BeaconChainTypes> Sampling<T> {
114134
self.handle_sampling_result(result, &id.id)
115135
}
116136

137+
/// Converts a result from the internal format of `ActiveSamplingRequest` (error first to use ?
138+
/// conveniently), to an Option first format to use an `if let Some() { act on result }` pattern
139+
/// in the sync manager.
117140
fn handle_sampling_result(
118141
&mut self,
119142
result: Result<Option<()>, SamplingError>,
120143
id: &SamplingRequester,
121144
) -> Option<SamplingResult> {
122145
let result = result.transpose();
123146
if result.is_some() {
124-
debug!(self.log, "Removed sampling request"; "id" => ?id);
147+
debug!(self.log, "Remove completed sampling request"; "id" => ?id, "result" => ?result);
125148
self.requests.remove(id);
126149
}
127150
result
@@ -146,6 +169,7 @@ pub enum SamplingError {
146169
ProcessorUnavailable,
147170
TooManyFailures,
148171
BadState(String),
172+
ColumnIndexOutOfBounds,
149173
}
150174

151175
/// Required success index by current failures, with p_target=5.00E-06
@@ -170,7 +194,8 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
170194
log: slog::Logger,
171195
) -> Self {
172196
// Select ahead of time the full list of to-sample columns
173-
let mut column_shuffle = (0..64).collect::<Vec<ColumnIndex>>();
197+
let mut column_shuffle = (0..<T::EthSpec as EthSpec>::number_of_columns() as ColumnIndex)
198+
.collect::<Vec<ColumnIndex>>();
174199
let mut rng = thread_rng();
175200
column_shuffle.shuffle(&mut rng);
176201

@@ -189,9 +214,15 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
189214
}
190215
}
191216

192-
// TODO: When is a fork and only a subset of your peers know about a block, sampling should only
193-
// be queried on the peers on that fork. Should this case be handled? How to handle it?
194-
fn on_sample_downloaded(
217+
/// Insert a downloaded column into an active sampling request. Then make progress on the
218+
/// entire request.
219+
///
220+
/// ### Returns
221+
///
222+
/// - `Err`: Sampling request has failed and will be dropped
223+
/// - `Ok(Some)`: Sampling request has successfully completed and will be dropped
224+
/// - `Ok(None)`: Sampling request still active
225+
pub(crate) fn on_sample_downloaded(
195226
&mut self,
196227
_peer_id: PeerId,
197228
column_index: ColumnIndex,
@@ -258,6 +289,14 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
258289
self.continue_sampling(cx)
259290
}
260291

292+
/// Insert a column verification result into an active sampling request. Then make progress
293+
/// on the entire request.
294+
///
295+
/// ### Returns
296+
///
297+
/// - `Err`: Sampling request has failed and will be dropped
298+
/// - `Ok(Some)`: Sampling request has successfully completed and will be dropped
299+
/// - `Ok(None)`: Sampling request still active
261300
pub(crate) fn on_sample_verified(
262301
&mut self,
263302
column_index: ColumnIndex,
@@ -301,7 +340,7 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
301340
self.continue_sampling(cx)
302341
}
303342

304-
fn continue_sampling(
343+
pub(crate) fn continue_sampling(
305344
&mut self,
306345
cx: &mut SyncNetworkContext<T>,
307346
) -> Result<Option<()>, SamplingError> {
@@ -337,8 +376,11 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
337376
// First, attempt to progress sampling by requesting more columns, so that request failures
338377
// are accounted for below.
339378
for idx in 0..*required_successes {
340-
// Re-request columns
341-
let column_index = self.column_shuffle[idx];
379+
// Re-request columns. Note: out of bounds error should never happen, inputs are hardcoded
380+
let column_index = *self
381+
.column_shuffle
382+
.get(idx)
383+
.ok_or(SamplingError::ColumnIndexOutOfBounds)?;
342384
let request = self
343385
.column_requests
344386
.entry(column_index)
@@ -431,6 +473,8 @@ mod request {
431473
Status::Verified => return Ok(false), // Already completed
432474
}
433475

476+
// TODO: When is a fork and only a subset of your peers know about a block, sampling should only
477+
// be queried on the peers on that fork. Should this case be handled? How to handle it?
434478
let peer_ids = cx.get_custodial_peers(
435479
block_slot.epoch(<T::EthSpec as EthSpec>::slots_per_epoch()),
436480
self.column_index,

0 commit comments

Comments
 (0)