Skip to content

Commit 0e59939

Browse files
authored
Add range sync metrics to track efficiency (#6095)
* Add more range sync metrics to track efficiency * Add ignored blocks metrics
1 parent 9f40d91 commit 0e59939

File tree

8 files changed

+151
-28
lines changed

8 files changed

+151
-28
lines changed

beacon_node/network/src/metrics.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,36 @@ lazy_static! {
237237
"Number of Syncing chains in range, per range type",
238238
&["range_type"]
239239
);
240+
pub static ref SYNCING_CHAINS_REMOVED: Result<IntCounterVec> = try_create_int_counter_vec(
241+
"sync_range_removed_chains_total",
242+
"Total count of range syncing chains removed per range type",
243+
&["range_type"]
244+
);
245+
pub static ref SYNCING_CHAINS_ADDED: Result<IntCounterVec> = try_create_int_counter_vec(
246+
"sync_range_added_chains_total",
247+
"Total count of range syncing chains added per range type",
248+
&["range_type"]
249+
);
250+
pub static ref SYNCING_CHAINS_DROPPED_BLOCKS: Result<IntCounterVec> = try_create_int_counter_vec(
251+
"sync_range_chains_dropped_blocks_total",
252+
"Total count of dropped blocks when removing a syncing chain per range type",
253+
&["range_type"]
254+
);
255+
pub static ref SYNCING_CHAINS_IGNORED_BLOCKS: Result<IntCounterVec> = try_create_int_counter_vec(
256+
"sync_range_chains_ignored_blocks_total",
257+
"Total count of ignored blocks when processing a syncing chain batch per chain type",
258+
&["chain_type"]
259+
);
260+
pub static ref SYNCING_CHAINS_PROCESSED_BATCHES: Result<IntCounterVec> = try_create_int_counter_vec(
261+
"sync_range_chains_processed_batches_total",
262+
"Total count of processed batches in a syncing chain batch per chain type",
263+
&["chain_type"]
264+
);
265+
pub static ref SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: Result<Histogram> = try_create_histogram_with_buckets(
266+
"sync_range_chain_batch_awaiting_processing_seconds",
267+
"Time range sync batches spend in AwaitingProcessing state",
268+
Ok(vec![0.01,0.02,0.05,0.1,0.2,0.5,1.0,2.0,5.0,10.0,20.0])
269+
);
240270
pub static ref SYNC_SINGLE_BLOCK_LOOKUPS: Result<IntGauge> = try_create_int_gauge(
241271
"sync_single_block_lookups",
242272
"Number of single block lookups underway"

beacon_node/network/src/network_beacon_processor/sync_methods.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
326326
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
327327
.await
328328
{
329-
(_, Ok(_)) => {
329+
(imported_blocks, Ok(_)) => {
330330
debug!(self.log, "Batch processed";
331331
"batch_epoch" => epoch,
332332
"first_block_slot" => start_slot,
@@ -335,7 +335,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
335335
"processed_blocks" => sent_blocks,
336336
"service"=> "sync");
337337
BatchProcessResult::Success {
338-
was_non_empty: sent_blocks > 0,
338+
sent_blocks,
339+
imported_blocks,
339340
}
340341
}
341342
(imported_blocks, Err(e)) => {
@@ -349,7 +350,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
349350
"service" => "sync");
350351
match e.peer_action {
351352
Some(penalty) => BatchProcessResult::FaultyFailure {
352-
imported_blocks: imported_blocks > 0,
353+
imported_blocks,
353354
penalty,
354355
},
355356
None => BatchProcessResult::NonFaultyFailure,
@@ -368,7 +369,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
368369
.sum::<usize>();
369370

370371
match self.process_backfill_blocks(downloaded_blocks) {
371-
(_, Ok(_)) => {
372+
(imported_blocks, Ok(_)) => {
372373
debug!(self.log, "Backfill batch processed";
373374
"batch_epoch" => epoch,
374375
"first_block_slot" => start_slot,
@@ -377,7 +378,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
377378
"processed_blobs" => n_blobs,
378379
"service"=> "sync");
379380
BatchProcessResult::Success {
380-
was_non_empty: sent_blocks > 0,
381+
sent_blocks,
382+
imported_blocks,
381383
}
382384
}
383385
(_, Err(e)) => {
@@ -390,7 +392,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
390392
"service" => "sync");
391393
match e.peer_action {
392394
Some(penalty) => BatchProcessResult::FaultyFailure {
393-
imported_blocks: false,
395+
imported_blocks: 0,
394396
penalty,
395397
},
396398
None => BatchProcessResult::NonFaultyFailure,

beacon_node/network/src/sync/backfill_sync/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
528528
// result callback. This is done, because an empty batch could end a chain and the logic
529529
// for removing chains and checking completion is in the callback.
530530

531-
let blocks = match batch.start_processing() {
531+
let (blocks, _) = match batch.start_processing() {
532532
Err(e) => {
533533
return self
534534
.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
@@ -615,13 +615,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
615615
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
616616

617617
match result {
618-
BatchProcessResult::Success { was_non_empty } => {
618+
BatchProcessResult::Success {
619+
imported_blocks, ..
620+
} => {
619621
if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) {
620622
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
621623
}
622624
// If the processed batch was not empty, we can validate previous unvalidated
623625
// blocks.
624-
if *was_non_empty {
626+
if *imported_blocks > 0 {
625627
self.advance_chain(network, batch_id);
626628
}
627629

@@ -677,7 +679,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
677679

678680
Ok(BatchOperationOutcome::Continue) => {
679681
// chain can continue. Check if it can be progressed
680-
if *imported_blocks {
682+
if *imported_blocks > 0 {
681683
// At least one block was successfully verified and imported, then we can be sure all
682684
// previous batches are valid and we only need to download the current failed
683685
// batch.

beacon_node/network/src/sync/manager.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,12 @@ pub enum BlockProcessingResult<E: EthSpec> {
156156
pub enum BatchProcessResult {
157157
/// The batch was completed successfully. It carries whether the sent batch contained blocks.
158158
Success {
159-
was_non_empty: bool,
159+
sent_blocks: usize,
160+
imported_blocks: usize,
160161
},
161162
/// The batch processing failed. It carries whether the processing imported any block.
162163
FaultyFailure {
163-
imported_blocks: bool,
164+
imported_blocks: usize,
164165
penalty: PeerAction,
165166
},
166167
NonFaultyFailure,

beacon_node/network/src/sync/range_sync/batch.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use lighthouse_network::PeerId;
55
use std::collections::HashSet;
66
use std::hash::{Hash, Hasher};
77
use std::ops::Sub;
8+
use std::time::{Duration, Instant};
89
use strum::Display;
910
use types::{Epoch, EthSpec, Slot};
1011

@@ -118,7 +119,7 @@ pub enum BatchState<E: EthSpec> {
118119
/// The batch is being downloaded.
119120
Downloading(PeerId, Id),
120121
/// The batch has been completely downloaded and is ready for processing.
121-
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>),
122+
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>, Instant),
122123
/// The batch is being processed.
123124
Processing(Attempt),
124125
/// The batch was successfully processed and is waiting to be validated.
@@ -210,13 +211,26 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
210211
match &self.state {
211212
BatchState::AwaitingDownload | BatchState::Failed => None,
212213
BatchState::Downloading(peer_id, _)
213-
| BatchState::AwaitingProcessing(peer_id, _)
214+
| BatchState::AwaitingProcessing(peer_id, _, _)
214215
| BatchState::Processing(Attempt { peer_id, .. })
215216
| BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id),
216217
BatchState::Poisoned => unreachable!("Poisoned batch"),
217218
}
218219
}
219220

221+
/// Returns the count of stored pending blocks if in awaiting processing state
222+
pub fn pending_blocks(&self) -> usize {
223+
match &self.state {
224+
BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(),
225+
BatchState::AwaitingDownload
226+
| BatchState::Downloading { .. }
227+
| BatchState::Processing { .. }
228+
| BatchState::AwaitingValidation { .. }
229+
| BatchState::Poisoned
230+
| BatchState::Failed => 0,
231+
}
232+
}
233+
220234
/// Returns a BlocksByRange request associated with the batch.
221235
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) {
222236
(
@@ -293,7 +307,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
293307
}
294308

295309
let received = blocks.len();
296-
self.state = BatchState::AwaitingProcessing(peer, blocks);
310+
self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now());
297311
Ok(received)
298312
}
299313
BatchState::Poisoned => unreachable!("Poisoned batch"),
@@ -365,11 +379,11 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
365379
}
366380
}
367381

368-
pub fn start_processing(&mut self) -> Result<Vec<RpcBlock<E>>, WrongState> {
382+
pub fn start_processing(&mut self) -> Result<(Vec<RpcBlock<E>>, Duration), WrongState> {
369383
match self.state.poison() {
370-
BatchState::AwaitingProcessing(peer, blocks) => {
384+
BatchState::AwaitingProcessing(peer, blocks, start_instant) => {
371385
self.state = BatchState::Processing(Attempt::new::<B, E>(peer, &blocks));
372-
Ok(blocks)
386+
Ok((blocks, start_instant.elapsed()))
373387
}
374388
BatchState::Poisoned => unreachable!("Poisoned batch"),
375389
other => {
@@ -515,7 +529,7 @@ impl<E: EthSpec> std::fmt::Debug for BatchState<E> {
515529
}) => write!(f, "AwaitingValidation({})", peer_id),
516530
BatchState::AwaitingDownload => f.write_str("AwaitingDownload"),
517531
BatchState::Failed => f.write_str("Failed"),
518-
BatchState::AwaitingProcessing(ref peer, ref blocks) => {
532+
BatchState::AwaitingProcessing(ref peer, ref blocks, _) => {
519533
write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len())
520534
}
521535
BatchState::Downloading(peer, request_id) => {

beacon_node/network/src/sync/range_sync/chain.rs

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
2+
use super::RangeSyncType;
3+
use crate::metrics;
24
use crate::network_beacon_processor::ChainSegmentProcessId;
35
use crate::sync::network_context::RangeRequestId;
46
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
@@ -11,6 +13,7 @@ use rand::{seq::SliceRandom, Rng};
1113
use slog::{crit, debug, o, warn};
1214
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
1315
use std::hash::{Hash, Hasher};
16+
use strum::IntoStaticStr;
1417
use types::{Epoch, EthSpec, Hash256, Slot};
1518

1619
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
@@ -53,13 +56,23 @@ pub struct KeepChain;
5356
pub type ChainId = u64;
5457
pub type BatchId = Epoch;
5558

59+
#[derive(Debug, Copy, Clone, IntoStaticStr)]
60+
pub enum SyncingChainType {
61+
Head,
62+
Finalized,
63+
Backfill,
64+
}
65+
5666
/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head
5767
/// root are grouped into the peer pool and queried for batches when downloading the
5868
/// chain.
5969
pub struct SyncingChain<T: BeaconChainTypes> {
6070
/// A random id used to identify this chain.
6171
id: ChainId,
6272

73+
/// SyncingChain type
74+
pub chain_type: SyncingChainType,
75+
6376
/// The start of the chain segment. Any epoch previous to this one has been validated.
6477
pub start_epoch: Epoch,
6578

@@ -126,6 +139,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
126139
target_head_slot: Slot,
127140
target_head_root: Hash256,
128141
peer_id: PeerId,
142+
chain_type: SyncingChainType,
129143
log: &slog::Logger,
130144
) -> Self {
131145
let mut peers = FnvHashMap::default();
@@ -135,6 +149,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
135149

136150
SyncingChain {
137151
id,
152+
chain_type,
138153
start_epoch,
139154
target_head_slot,
140155
target_head_root,
@@ -171,6 +186,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
171186
self.validated_batches * EPOCHS_PER_BATCH
172187
}
173188

189+
/// Returns the total count of pending blocks in all the batches of this chain
190+
pub fn pending_blocks(&self) -> usize {
191+
self.batches
192+
.values()
193+
.map(|batch| batch.pending_blocks())
194+
.sum()
195+
}
196+
174197
/// Removes a peer from the chain.
175198
/// If the peer has active batches, those are considered failed and re-requested.
176199
pub fn remove_peer(
@@ -305,7 +328,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
305328
// result callback. This is done, because an empty batch could end a chain and the logic
306329
// for removing chains and checking completion is in the callback.
307330

308-
let blocks = batch.start_processing()?;
331+
let (blocks, duration_in_awaiting_processing) = batch.start_processing()?;
332+
metrics::observe_duration(
333+
&metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING,
334+
duration_in_awaiting_processing,
335+
);
336+
309337
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
310338
self.current_processing_batch = Some(batch_id);
311339

@@ -469,10 +497,27 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
469497
// We consider three cases. Batch was successfully processed, Batch failed processing due
470498
// to a faulty peer, or batch failed processing but the peer can't be deemed faulty.
471499
match result {
472-
BatchProcessResult::Success { was_non_empty } => {
500+
BatchProcessResult::Success {
501+
sent_blocks,
502+
imported_blocks,
503+
} => {
504+
if sent_blocks > imported_blocks {
505+
let ignored_blocks = sent_blocks - imported_blocks;
506+
metrics::inc_counter_vec_by(
507+
&metrics::SYNCING_CHAINS_IGNORED_BLOCKS,
508+
&[self.chain_type.into()],
509+
ignored_blocks as u64,
510+
);
511+
}
512+
metrics::inc_counter_vec(
513+
&metrics::SYNCING_CHAINS_PROCESSED_BATCHES,
514+
&[self.chain_type.into()],
515+
);
516+
473517
batch.processing_completed(BatchProcessingResult::Success)?;
474518

475-
if *was_non_empty {
519+
// was not empty = sent_blocks > 0
520+
if *sent_blocks > 0 {
476521
// If the processed batch was not empty, we can validate previous unvalidated
477522
// blocks.
478523
self.advance_chain(network, batch_id);
@@ -515,7 +560,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
515560
match batch.processing_completed(BatchProcessingResult::FaultyFailure)? {
516561
BatchOperationOutcome::Continue => {
517562
// Chain can continue. Check if it can be moved forward.
518-
if *imported_blocks {
563+
if *imported_blocks > 0 {
519564
// At least one block was successfully verified and imported, so we can be sure all
520565
// previous batches are valid and we only need to download the current failed
521566
// batch.
@@ -1142,3 +1187,12 @@ impl RemoveChain {
11421187
)
11431188
}
11441189
}
1190+
1191+
impl From<RangeSyncType> for SyncingChainType {
1192+
fn from(value: RangeSyncType) -> Self {
1193+
match value {
1194+
RangeSyncType::Head => Self::Head,
1195+
RangeSyncType::Finalized => Self::Finalized,
1196+
}
1197+
}
1198+
}

0 commit comments

Comments
 (0)