Skip to content

Commit b2f7a5b

Browse files
committed
correctly reschedule reconstruction
1 parent f813767 commit b2f7a5b

File tree

3 files changed

+41
-13
lines changed

3 files changed

+41
-13
lines changed

beacon_node/beacon_processor/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -500,10 +500,12 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
500500
drop_during_sync: false,
501501
work: Work::ChainSegmentBackfill(process_fn),
502502
},
503-
ReadyWork::ColumnReconstruction(QueuedColumnReconstruction(process_fn)) => Self {
504-
drop_during_sync: true,
505-
work: Work::ColumnReconstruction(process_fn),
506-
},
503+
ReadyWork::ColumnReconstruction(QueuedColumnReconstruction { process_fn, .. }) => {
504+
Self {
505+
drop_during_sync: true,
506+
work: Work::ColumnReconstruction(process_fn),
507+
}
508+
}
507509
}
508510
}
509511
}

beacon_node/beacon_processor/src/work_reprocessing_queue.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use itertools::Itertools;
1919
use logging::crit;
2020
use logging::TimeLatch;
2121
use slot_clock::SlotClock;
22+
use std::collections::hash_map::Entry;
2223
use std::collections::{HashMap, HashSet};
2324
use std::future::Future;
2425
use std::pin::Pin;
@@ -182,7 +183,10 @@ pub struct IgnoredRpcBlock {
182183
/// A backfill batch work that has been queued for processing later.
183184
pub struct QueuedBackfillBatch(pub AsyncFn);
184185

185-
pub struct QueuedColumnReconstruction(pub AsyncFn);
186+
pub struct QueuedColumnReconstruction {
187+
pub block_root: Hash256,
188+
pub process_fn: AsyncFn,
189+
}
186190

187191
impl<E: EthSpec> TryFrom<WorkEvent<E>> for QueuedBackfillBatch {
188192
type Error = WorkEvent<E>;
@@ -264,6 +268,8 @@ struct ReprocessQueue<S> {
264268
queued_sampling_requests: FnvHashMap<usize, (QueuedSamplingRequest, DelayKey)>,
265269
/// Sampling requests per block root.
266270
awaiting_sampling_requests_per_block_root: HashMap<Hash256, Vec<QueuedSamplingRequestId>>,
271+
/// Column reconstruction per block root.
272+
queued_column_reconstructions: HashMap<Hash256, DelayKey>,
267273
/// Queued backfill batches
268274
queued_backfill_batches: Vec<QueuedBackfillBatch>,
269275

@@ -441,6 +447,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
441447
awaiting_lc_updates_per_parent_root: HashMap::new(),
442448
awaiting_sampling_requests_per_block_root: <_>::default(),
443449
queued_backfill_batches: Vec::new(),
450+
queued_column_reconstructions: HashMap::new(),
444451
next_attestation: 0,
445452
next_lc_update: 0,
446453
next_sampling_request_update: 0,
@@ -840,8 +847,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
840847
}
841848
}
842849
InboundEvent::Msg(DelayColumnReconstruction(request)) => {
843-
self.column_reconstructions_delay_queue
844-
.insert(request, QUEUED_RECONSTRUCTION_DELAY);
850+
match self.queued_column_reconstructions.entry(request.block_root) {
851+
Entry::Occupied(key) => {
852+
// Push back the reattempted reconstruction
853+
self.column_reconstructions_delay_queue
854+
.reset(key.get(), QUEUED_RECONSTRUCTION_DELAY)
855+
}
856+
Entry::Vacant(vacant) => {
857+
let delay_key = self
858+
.column_reconstructions_delay_queue
859+
.insert(request, QUEUED_RECONSTRUCTION_DELAY);
860+
vacant.insert(delay_key);
861+
}
862+
}
845863
}
846864
// A block that was queued for later processing is now ready to be processed.
847865
InboundEvent::ReadyGossipBlock(ready_block) => {
@@ -967,6 +985,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
967985
}
968986
}
969987
InboundEvent::ReadyColumnReconstruction(column_reconstruction) => {
988+
self.queued_column_reconstructions
989+
.remove(&column_reconstruction.block_root);
970990
if self
971991
.ready_work_tx
972992
.try_send(ReadyWork::ColumnReconstruction(column_reconstruction))

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use beacon_chain::{
1212
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer,
1313
ReconstructionOutcome,
1414
};
15+
use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
1516
use beacon_processor::{
1617
work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache,
1718
GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent,
@@ -973,11 +974,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
973974
let send_result = self
974975
.reprocess_tx
975976
.send(ReprocessQueueMessage::DelayColumnReconstruction(
976-
QueuedColumnReconstruction(Box::pin(async move {
977-
cloned_self
978-
.attempt_data_column_reconstruction(block_root, publish_columns)
979-
.await;
980-
})),
977+
QueuedColumnReconstruction {
978+
block_root,
979+
process_fn: Box::pin(async move {
980+
cloned_self
981+
.attempt_data_column_reconstruction(
982+
block_root,
983+
publish_columns,
984+
)
985+
.await;
986+
}),
987+
},
981988
))
982989
.await;
983990

@@ -1167,7 +1174,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
11671174
}
11681175
}
11691176

1170-
use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
11711177
#[cfg(test)]
11721178
use {
11731179
beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend},

0 commit comments

Comments
 (0)