Skip to content

Commit e392478

Browse files
committed
Wait before column reconstruction (#7594)
Squashed commit of the following: commit 719df16 Author: Jimmy Chen <[email protected]> Date: Fri Jun 13 16:02:51 2025 +0200 Pop from column_reconstruction_queue. Co-authored-by: dknopik <[email protected]> commit 19113d1 Author: Daniel Knopik <[email protected]> Date: Fri Jun 13 14:16:13 2025 +0200 FORMAT? commit ab94e8a Author: Daniel Knopik <[email protected]> Date: Fri Jun 13 14:12:28 2025 +0200 maybe fix test? commit 0ee7abf Author: Daniel Knopik <[email protected]> Date: Thu Jun 12 16:44:27 2025 +0200 remove obsolete change commit 36ca44d Merge: 5a24f45 5472cb8 Author: Daniel Knopik <[email protected]> Date: Thu Jun 12 16:39:53 2025 +0200 Merge branch 'unstable' into wait-before-reconstruction commit 5a24f45 Author: Daniel Knopik <[email protected]> Date: Thu Jun 12 16:38:15 2025 +0200 refactor commit 891af69 Author: Daniel Knopik <[email protected]> Date: Thu Jun 12 09:28:49 2025 +0200 fmt commit b7c0dd1 Merge: 492d73c 9803d69 Author: Daniel Knopik <[email protected]> Date: Thu Jun 12 09:22:33 2025 +0200 Merge branch 'unstable' into wait-before-reconstruction # Conflicts: # beacon_node/network/src/network_beacon_processor/mod.rs commit 492d73c Author: Daniel Knopik <[email protected]> Date: Wed Jun 11 15:50:28 2025 +0200 make sure to let only retry start reconstruction commit b2f7a5b Author: Daniel Knopik <[email protected]> Date: Wed Jun 11 15:15:44 2025 +0200 correctly reschedule reconstruction commit f813767 Author: Daniel Knopik <[email protected]> Date: Wed Jun 11 14:42:00 2025 +0200 fmt commit c7f5211 Author: Daniel Knopik <[email protected]> Date: Wed Jun 11 14:38:26 2025 +0200 Log and restructure commit 65e5ac4 Author: Daniel Knopik <[email protected]> Date: Wed Jun 11 14:05:50 2025 +0200 Use processor commit e2216c0 Author: Daniel Knopik <[email protected]> Date: Wed Jun 11 09:26:10 2025 +0200 try 100ms commit bef3344 Author: Daniel Knopik <[email protected]> Date: Wed Jun 11 09:07:07 2025 +0200 Logging for testing commit 1eee636 Author: Daniel Knopik <[email protected]> Date: Tue Jun 10 18:05:30 2025 +0200 Wait for more columns before starting reconstruction
1 parent b5865e8 commit e392478

File tree

4 files changed

+110
-3
lines changed

4 files changed

+110
-3
lines changed

beacon_node/beacon_processor/src/lib.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
//! task.
4040
4141
use crate::work_reprocessing_queue::{
42-
QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage,
42+
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
4343
};
4444
use futures::stream::{Stream, StreamExt};
4545
use futures::task::Poll;
@@ -117,6 +117,7 @@ pub struct BeaconProcessorQueueLengths {
117117
rpc_custody_column_queue: usize,
118118
rpc_verify_data_column_queue: usize,
119119
sampling_result_queue: usize,
120+
column_reconstruction_queue: usize,
120121
chain_segment_queue: usize,
121122
backfill_chain_segment: usize,
122123
gossip_block_queue: usize,
@@ -184,6 +185,7 @@ impl BeaconProcessorQueueLengths {
184185
rpc_verify_data_column_queue: 1000,
185186
unknown_block_sampling_request_queue: 16384,
186187
sampling_result_queue: 1000,
188+
column_reconstruction_queue: 64,
187189
chain_segment_queue: 64,
188190
backfill_chain_segment: 64,
189191
gossip_block_queue: 1024,
@@ -498,6 +500,12 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
498500
drop_during_sync: false,
499501
work: Work::ChainSegmentBackfill(process_fn),
500502
},
503+
ReadyWork::ColumnReconstruction(QueuedColumnReconstruction { process_fn, .. }) => {
504+
Self {
505+
drop_during_sync: true,
506+
work: Work::ColumnReconstruction(process_fn),
507+
}
508+
}
501509
}
502510
}
503511
}
@@ -619,6 +627,7 @@ pub enum Work<E: EthSpec> {
619627
RpcCustodyColumn(AsyncFn),
620628
RpcVerifyDataColumn(AsyncFn),
621629
SamplingResult(AsyncFn),
630+
ColumnReconstruction(AsyncFn),
622631
IgnoredRpcBlock {
623632
process_fn: BlockingFn,
624633
},
@@ -674,6 +683,7 @@ pub enum WorkType {
674683
RpcCustodyColumn,
675684
RpcVerifyDataColumn,
676685
SamplingResult,
686+
ColumnReconstruction,
677687
IgnoredRpcBlock,
678688
ChainSegment,
679689
ChainSegmentBackfill,
@@ -725,6 +735,7 @@ impl<E: EthSpec> Work<E> {
725735
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
726736
Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn,
727737
Work::SamplingResult { .. } => WorkType::SamplingResult,
738+
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
728739
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
729740
Work::ChainSegment { .. } => WorkType::ChainSegment,
730741
Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill,
@@ -891,6 +902,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
891902
FifoQueue::new(queue_lengths.rpc_verify_data_column_queue);
892903
// TODO(das): the sampling_request_queue is never read
893904
let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue);
905+
let mut column_reconstruction_queue =
906+
FifoQueue::new(queue_lengths.column_reconstruction_queue);
894907
let mut unknown_block_sampling_request_queue =
895908
FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue);
896909
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
@@ -1072,6 +1085,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
10721085
Some(item)
10731086
} else if let Some(item) = gossip_data_column_queue.pop() {
10741087
Some(item)
1088+
} else if let Some(item) = column_reconstruction_queue.pop() {
1089+
Some(item)
10751090
// Check the priority 0 API requests after blocks and blobs, but before attestations.
10761091
} else if let Some(item) = api_request_p0_queue.pop() {
10771092
Some(item)
@@ -1371,6 +1386,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
13711386
rpc_verify_data_column_queue.push(work, work_id)
13721387
}
13731388
Work::SamplingResult(_) => sampling_result_queue.push(work, work_id),
1389+
Work::ColumnReconstruction(_) => {
1390+
column_reconstruction_queue.push(work, work_id)
1391+
}
13741392
Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id),
13751393
Work::ChainSegmentBackfill { .. } => {
13761394
backfill_chain_segment.push(work, work_id)
@@ -1460,6 +1478,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
14601478
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
14611479
WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(),
14621480
WorkType::SamplingResult => sampling_result_queue.len(),
1481+
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
14631482
WorkType::ChainSegment => chain_segment_queue.len(),
14641483
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
14651484
WorkType::Status => status_queue.len(),
@@ -1602,7 +1621,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
16021621
| Work::RpcBlobs { process_fn }
16031622
| Work::RpcCustodyColumn(process_fn)
16041623
| Work::RpcVerifyDataColumn(process_fn)
1605-
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
1624+
| Work::SamplingResult(process_fn)
1625+
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
16061626
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
16071627
Work::GossipBlock(work)
16081628
| Work::GossipBlobSidecar(work)

beacon_node/beacon_processor/src/work_reprocessing_queue.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use itertools::Itertools;
1919
use logging::crit;
2020
use logging::TimeLatch;
2121
use slot_clock::SlotClock;
22+
use std::collections::hash_map::Entry;
2223
use std::collections::{HashMap, HashSet};
2324
use std::future::Future;
2425
use std::pin::Pin;
@@ -54,6 +55,9 @@ pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4);
5455
/// For how long to queue sampling requests for reprocessing.
5556
pub const QUEUED_SAMPLING_REQUESTS_DELAY: Duration = Duration::from_secs(12);
5657

58+
/// For how long to queue delayed column reconstruction.
59+
pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150);
60+
5761
/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
5862
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
5963
/// it's nice to have extra protection.
@@ -109,6 +113,8 @@ pub enum ReprocessQueueMessage {
109113
UnknownBlockSamplingRequest(QueuedSamplingRequest),
110114
/// A new backfill batch that needs to be scheduled for processing.
111115
BackfillSync(QueuedBackfillBatch),
116+
/// A delayed column reconstruction that needs checking
117+
DelayColumnReconstruction(QueuedColumnReconstruction),
112118
}
113119

114120
/// Events sent by the scheduler once they are ready for re-processing.
@@ -121,6 +127,7 @@ pub enum ReadyWork {
121127
LightClientUpdate(QueuedLightClientUpdate),
122128
SamplingRequest(QueuedSamplingRequest),
123129
BackfillSync(QueuedBackfillBatch),
130+
ColumnReconstruction(QueuedColumnReconstruction),
124131
}
125132

126133
/// An Attestation for which the corresponding block was not seen while processing, queued for
@@ -176,6 +183,11 @@ pub struct IgnoredRpcBlock {
176183
/// A backfill batch work that has been queued for processing later.
177184
pub struct QueuedBackfillBatch(pub AsyncFn);
178185

186+
pub struct QueuedColumnReconstruction {
187+
pub block_root: Hash256,
188+
pub process_fn: AsyncFn,
189+
}
190+
179191
impl<E: EthSpec> TryFrom<WorkEvent<E>> for QueuedBackfillBatch {
180192
type Error = WorkEvent<E>;
181193

@@ -212,6 +224,8 @@ enum InboundEvent {
212224
ReadyLightClientUpdate(QueuedLightClientUpdateId),
213225
/// A backfill batch that was queued is ready for processing.
214226
ReadyBackfillSync(QueuedBackfillBatch),
227+
/// A column reconstruction that was queued is ready for processing.
228+
ReadyColumnReconstruction(QueuedColumnReconstruction),
215229
/// A message sent to the `ReprocessQueue`
216230
Msg(ReprocessQueueMessage),
217231
}
@@ -234,6 +248,8 @@ struct ReprocessQueue<S> {
234248
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
235249
/// Queue to manage scheduled sampling requests
236250
sampling_requests_delay_queue: DelayQueue<QueuedSamplingRequestId>,
251+
/// Queue to manage scheduled column reconstructions.
252+
column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>,
237253

238254
/* Queued items */
239255
/// Queued blocks.
@@ -252,6 +268,8 @@ struct ReprocessQueue<S> {
252268
queued_sampling_requests: FnvHashMap<usize, (QueuedSamplingRequest, DelayKey)>,
253269
/// Sampling requests per block root.
254270
awaiting_sampling_requests_per_block_root: HashMap<Hash256, Vec<QueuedSamplingRequestId>>,
271+
/// Column reconstruction per block root.
272+
queued_column_reconstructions: HashMap<Hash256, DelayKey>,
255273
/// Queued backfill batches
256274
queued_backfill_batches: Vec<QueuedBackfillBatch>,
257275

@@ -343,6 +361,15 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
343361
Poll::Ready(None) | Poll::Pending => (),
344362
}
345363

364+
match self.column_reconstructions_delay_queue.poll_expired(cx) {
365+
Poll::Ready(Some(reconstruction)) => {
366+
return Poll::Ready(Some(InboundEvent::ReadyColumnReconstruction(
367+
reconstruction.into_inner(),
368+
)));
369+
}
370+
Poll::Ready(None) | Poll::Pending => (),
371+
}
372+
346373
if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() {
347374
match next_backfill_batch_event.as_mut().poll(cx) {
348375
Poll::Ready(_) => {
@@ -410,6 +437,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
410437
attestations_delay_queue: DelayQueue::new(),
411438
lc_updates_delay_queue: DelayQueue::new(),
412439
sampling_requests_delay_queue: <_>::default(),
440+
column_reconstructions_delay_queue: DelayQueue::new(),
413441
queued_gossip_block_roots: HashSet::new(),
414442
queued_lc_updates: FnvHashMap::default(),
415443
queued_aggregates: FnvHashMap::default(),
@@ -419,6 +447,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
419447
awaiting_lc_updates_per_parent_root: HashMap::new(),
420448
awaiting_sampling_requests_per_block_root: <_>::default(),
421449
queued_backfill_batches: Vec::new(),
450+
queued_column_reconstructions: HashMap::new(),
422451
next_attestation: 0,
423452
next_lc_update: 0,
424453
next_sampling_request_update: 0,
@@ -817,6 +846,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
817846
self.recompute_next_backfill_batch_event();
818847
}
819848
}
849+
InboundEvent::Msg(DelayColumnReconstruction(request)) => {
850+
match self.queued_column_reconstructions.entry(request.block_root) {
851+
Entry::Occupied(key) => {
852+
// Push back the reattempted reconstruction
853+
self.column_reconstructions_delay_queue
854+
.reset(key.get(), QUEUED_RECONSTRUCTION_DELAY)
855+
}
856+
Entry::Vacant(vacant) => {
857+
let delay_key = self
858+
.column_reconstructions_delay_queue
859+
.insert(request, QUEUED_RECONSTRUCTION_DELAY);
860+
vacant.insert(delay_key);
861+
}
862+
}
863+
}
820864
// A block that was queued for later processing is now ready to be processed.
821865
InboundEvent::ReadyGossipBlock(ready_block) => {
822866
let block_root = ready_block.beacon_block_root;
@@ -940,6 +984,20 @@ impl<S: SlotClock> ReprocessQueue<S> {
940984
_ => crit!("Unexpected return from try_send error"),
941985
}
942986
}
987+
InboundEvent::ReadyColumnReconstruction(column_reconstruction) => {
988+
self.queued_column_reconstructions
989+
.remove(&column_reconstruction.block_root);
990+
if self
991+
.ready_work_tx
992+
.try_send(ReadyWork::ColumnReconstruction(column_reconstruction))
993+
.is_err()
994+
{
995+
error!(
996+
hint = "system may be overloaded",
997+
"Ignored scheduled column reconstruction"
998+
);
999+
}
1000+
}
9431001
}
9441002

9451003
metrics::set_gauge_vec(

beacon_node/network/src/network_beacon_processor/gossip_methods.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use std::sync::Arc;
3232
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
3333
use store::hot_cold_store::HotColdDBError;
3434
use tokio::sync::mpsc;
35+
use tokio::sync::mpsc::error::SendError;
3536
use tracing::{debug, error, info, trace, warn};
3637
use types::{
3738
beacon_block::BlockImportSource, Attestation, AttestationData, AttestationRef,
@@ -42,6 +43,7 @@ use types::{
4243
SyncCommitteeMessage, SyncSubnetId,
4344
};
4445

46+
use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
4547
use beacon_processor::{
4648
work_reprocessing_queue::{
4749
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
@@ -1173,8 +1175,31 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
11731175
"Processed data column, waiting for other components"
11741176
);
11751177

1176-
self.attempt_data_column_reconstruction(block_root, true)
1178+
// Instead of triggering reconstruction immediately, schedule it to be run. If
1179+
// another column arrives it either completes availability or pushes
1180+
// reconstruction back a bit.
1181+
let cloned_self = Arc::clone(self);
1182+
let send_result = self
1183+
.reprocess_tx
1184+
.send(ReprocessQueueMessage::DelayColumnReconstruction(
1185+
QueuedColumnReconstruction {
1186+
block_root,
1187+
process_fn: Box::pin(async move {
1188+
cloned_self
1189+
.attempt_data_column_reconstruction(block_root, true)
1190+
.await;
1191+
}),
1192+
},
1193+
))
11771194
.await;
1195+
if let Err(SendError(ReprocessQueueMessage::DelayColumnReconstruction(
1196+
reconstruction,
1197+
))) = send_result
1198+
{
1199+
warn!("Unable to send reconstruction to reprocessing");
1200+
// Execute it immediately instead.
1201+
reconstruction.process_fn.await;
1202+
}
11781203
}
11791204
},
11801205
Err(BlockError::DuplicateFullyImported(_)) => {

beacon_node/network/src/network_beacon_processor/tests.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,10 @@ async fn import_gossip_block_acceptably_early() {
729729
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
730730
.await;
731731
}
732+
if num_data_columns > 0 {
733+
rig.assert_event_journal_completes(&[WorkType::ColumnReconstruction])
734+
.await;
735+
}
732736

733737
// Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock
734738
// and check the head in the time between the block arrived early and when its due for

0 commit comments

Comments
 (0)