Skip to content

Commit 90c30b3

Browse files
committed
Notify lookup sync of gossip processing results
1 parent ee974db commit 90c30b3

File tree

10 files changed

+194
-123
lines changed

10 files changed

+194
-123
lines changed

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
8484
})
8585
}
8686

87-
/// Checks if the block root is currenlty in the availability cache awaiting processing because
87+
/// Checks if the block root is currenlty in the availability cache awaiting import because
8888
/// of missing components.
8989
pub fn has_block(&self, block_root: &Hash256) -> bool {
9090
self.availability_cache.has_block(block_root)

beacon_node/network/src/network_beacon_processor/gossip_methods.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use crate::{
22
metrics,
33
network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor},
44
service::NetworkMessage,
5-
sync::SyncMessage,
5+
sync::{
6+
manager::{BlockProcessSource, BlockProcessType},
7+
SyncMessage,
8+
},
69
};
710
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
811
use beacon_chain::block_verification_types::AsBlock;
@@ -1187,19 +1190,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
11871190
"block_root" => %block_root,
11881191
);
11891192
}
1190-
Err(BlockError::ParentUnknown(block)) => {
1191-
// Inform the sync manager to find parents for this block
1192-
// This should not occur. It should be checked by `should_forward_block`
1193+
Err(BlockError::ParentUnknown(_)) => {
1194+
// This should not occur. It should be checked by `should_forward_block`.
1195+
// Do not send sync message UnknownParentBlock to prevent conflicts with the
1196+
// BlockComponentProcessed message below. If this error ever happens, lookup sync
1197+
// can recover by receiving another block / blob / attestation referencing the
1198+
// chain that includes this block.
11931199
error!(
11941200
self.log,
11951201
"Block with unknown parent attempted to be processed";
1202+
"block_root" => %block_root,
11961203
"peer_id" => %peer_id
11971204
);
1198-
self.send_sync_message(SyncMessage::UnknownParentBlock(
1199-
peer_id,
1200-
block.clone(),
1201-
block_root,
1202-
));
12031205
}
12041206
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
12051207
debug!(
@@ -1263,6 +1265,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
12631265
&self.log,
12641266
);
12651267
}
1268+
1269+
self.send_sync_message(SyncMessage::BlockComponentProcessed {
1270+
process_type: BlockProcessType::SingleBlock,
1271+
source: BlockProcessSource::Gossip(block_root),
1272+
result: result.into(),
1273+
});
12661274
}
12671275

12681276
pub fn process_gossip_voluntary_exit(

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
service::NetworkMessage,
3-
sync::{manager::BlockProcessType, SyncMessage},
3+
sync::{manager::BlockProcessSource, SyncMessage},
44
};
55
use beacon_chain::block_verification_types::RpcBlock;
66
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
@@ -407,13 +407,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
407407
block_root: Hash256,
408408
block: RpcBlock<T::EthSpec>,
409409
seen_timestamp: Duration,
410-
process_type: BlockProcessType,
410+
source: BlockProcessSource,
411411
) -> Result<(), Error<T::EthSpec>> {
412412
let process_fn = self.clone().generate_rpc_beacon_block_process_fn(
413413
block_root,
414414
block,
415415
seen_timestamp,
416-
process_type,
416+
source,
417417
);
418418
self.try_send(BeaconWorkEvent {
419419
drop_during_sync: false,
@@ -428,18 +428,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
428428
block_root: Hash256,
429429
blobs: FixedBlobSidecarList<T::EthSpec>,
430430
seen_timestamp: Duration,
431-
process_type: BlockProcessType,
431+
source: BlockProcessSource,
432432
) -> Result<(), Error<T::EthSpec>> {
433433
let blob_count = blobs.iter().filter(|b| b.is_some()).count();
434434
if blob_count == 0 {
435435
return Ok(());
436436
}
437-
let process_fn = self.clone().generate_rpc_blobs_process_fn(
438-
block_root,
439-
blobs,
440-
seen_timestamp,
441-
process_type,
442-
);
437+
let process_fn =
438+
self.clone()
439+
.generate_rpc_blobs_process_fn(block_root, blobs, seen_timestamp, source);
443440
self.try_send(BeaconWorkEvent {
444441
drop_during_sync: false,
445442
work: Work::RpcBlobs { process_fn },

beacon_node/network/src/network_beacon_processor/sync_methods.rs

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::metrics;
22
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
3+
use crate::sync::manager::BlockProcessSource;
34
use crate::sync::BatchProcessResult;
45
use crate::sync::{
56
manager::{BlockProcessType, SyncMessage},
@@ -53,7 +54,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
5354
block_root: Hash256,
5455
block: RpcBlock<T::EthSpec>,
5556
seen_timestamp: Duration,
56-
process_type: BlockProcessType,
57+
source: BlockProcessSource,
5758
) -> AsyncFn {
5859
let process_fn = async move {
5960
let reprocess_tx = self.reprocess_tx.clone();
@@ -62,7 +63,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
6263
block_root,
6364
block,
6465
seen_timestamp,
65-
process_type,
66+
source,
6667
reprocess_tx,
6768
duplicate_cache,
6869
)
@@ -77,20 +78,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
7778
block_root: Hash256,
7879
block: RpcBlock<T::EthSpec>,
7980
seen_timestamp: Duration,
80-
process_type: BlockProcessType,
81+
source: BlockProcessSource,
8182
) -> (AsyncFn, BlockingFn) {
8283
// An async closure which will import the block.
8384
let process_fn = self.clone().generate_rpc_beacon_block_process_fn(
8485
block_root,
8586
block,
8687
seen_timestamp,
87-
process_type.clone(),
88+
source.clone(),
8889
);
8990
// A closure which will ignore the block.
9091
let ignore_fn = move || {
9192
// Sync handles these results
9293
self.send_sync_message(SyncMessage::BlockComponentProcessed {
93-
process_type,
94+
process_type: BlockProcessType::SingleBlock,
95+
source,
9496
result: crate::sync::manager::BlockProcessingResult::Ignored,
9597
});
9698
};
@@ -104,7 +106,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
104106
block_root: Hash256,
105107
block: RpcBlock<T::EthSpec>,
106108
seen_timestamp: Duration,
107-
process_type: BlockProcessType,
109+
source: BlockProcessSource,
108110
reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
109111
duplicate_cache: DuplicateCache,
110112
) {
@@ -115,15 +117,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
115117
"Gossip block is being processed";
116118
"action" => "sending rpc block to reprocessing queue",
117119
"block_root" => %block_root,
118-
"process_type" => ?process_type,
120+
"source" => ?source,
119121
);
120122

121123
// Send message to work reprocess queue to retry the block
122124
let (process_fn, ignore_fn) = self.clone().generate_rpc_beacon_block_fns(
123125
block_root,
124126
block,
125127
seen_timestamp,
126-
process_type,
128+
source,
127129
);
128130
let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock {
129131
beacon_block_root: block_root,
@@ -148,7 +150,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
148150
"proposer" => block.message().proposer_index(),
149151
"slot" => block.slot(),
150152
"commitments" => commitments_formatted,
151-
"process_type" => ?process_type,
153+
"source" => ?source,
152154
);
153155

154156
let result = self
@@ -170,21 +172,20 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
170172
if reprocess_tx.try_send(reprocess_msg).is_err() {
171173
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
172174
};
173-
if matches!(process_type, BlockProcessType::SingleBlock { .. }) {
174-
self.chain.block_times_cache.write().set_time_observed(
175-
hash,
176-
slot,
177-
seen_timestamp,
178-
None,
179-
None,
180-
);
175+
self.chain.block_times_cache.write().set_time_observed(
176+
hash,
177+
slot,
178+
seen_timestamp,
179+
None,
180+
None,
181+
);
181182

182-
self.chain.recompute_head_at_current_slot().await;
183-
}
183+
self.chain.recompute_head_at_current_slot().await;
184184
}
185185
// Sync handles these results
186186
self.send_sync_message(SyncMessage::BlockComponentProcessed {
187-
process_type,
187+
process_type: BlockProcessType::SingleBlock,
188+
source,
188189
result: result.into(),
189190
});
190191

@@ -201,11 +202,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
201202
block_root: Hash256,
202203
blobs: FixedBlobSidecarList<T::EthSpec>,
203204
seen_timestamp: Duration,
204-
process_type: BlockProcessType,
205+
source: BlockProcessSource,
205206
) -> AsyncFn {
206207
let process_fn = async move {
207208
self.clone()
208-
.process_rpc_blobs(block_root, blobs, seen_timestamp, process_type)
209+
.process_rpc_blobs(block_root, blobs, seen_timestamp, source)
209210
.await;
210211
};
211212
Box::pin(process_fn)
@@ -217,7 +218,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
217218
block_root: Hash256,
218219
blobs: FixedBlobSidecarList<T::EthSpec>,
219220
seen_timestamp: Duration,
220-
process_type: BlockProcessType,
221+
source: BlockProcessSource,
221222
) {
222223
let Some(slot) = blobs
223224
.iter()
@@ -298,7 +299,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
298299

299300
// Sync handles these results
300301
self.send_sync_message(SyncMessage::BlockComponentProcessed {
301-
process_type,
302+
process_type: BlockProcessType::SingleBlob,
303+
source,
302304
result: result.into(),
303305
});
304306
}

beacon_node/network/src/sync/block_lookups/common.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use crate::sync::block_lookups::single_block_lookup::{
44
use crate::sync::block_lookups::{
55
BlobRequestState, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
66
};
7-
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
8-
use crate::sync::network_context::SyncNetworkContext;
7+
use crate::sync::manager::{Id, SLOT_IMPORT_TOLERANCE};
8+
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
99
use beacon_chain::block_verification_types::RpcBlock;
1010
use beacon_chain::BeaconChainTypes;
1111
use std::sync::Arc;
@@ -66,11 +66,15 @@ pub trait RequestState<T: BeaconChainTypes> {
6666
.use_rand_available_peer()
6767
.ok_or(LookupRequestError::NoPeers)?;
6868

69-
// make_request returns true only if a request needs to be made
70-
if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
71-
self.get_state_mut().on_download_start()?;
72-
} else {
73-
self.get_state_mut().on_completed_request()?;
69+
match self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
70+
LookupRequestResult::RequestSent => self.get_state_mut().on_download_start()?,
71+
LookupRequestResult::NoRequestNeeded => {
72+
self.get_state_mut().on_completed_request()?
73+
}
74+
75+
LookupRequestResult::AwaitingOtherSource => {
76+
self.get_state_mut().on_processing_from_other_source()?
77+
}
7478
}
7579

7680
// Otherwise, attempt to progress awaiting processing
@@ -98,7 +102,7 @@ pub trait RequestState<T: BeaconChainTypes> {
98102
peer_id: PeerId,
99103
downloaded_block_expected_blobs: Option<usize>,
100104
cx: &mut SyncNetworkContext<T>,
101-
) -> Result<bool, LookupRequestError>;
105+
) -> Result<LookupRequestResult, LookupRequestError>;
102106

103107
/* Response handling methods */
104108

@@ -133,7 +137,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
133137
peer_id: PeerId,
134138
_: Option<usize>,
135139
cx: &mut SyncNetworkContext<T>,
136-
) -> Result<bool, LookupRequestError> {
140+
) -> Result<LookupRequestResult, LookupRequestError> {
137141
cx.block_lookup_request(id, peer_id, self.requested_block_root)
138142
.map_err(LookupRequestError::SendFailed)
139143
}
@@ -150,10 +154,10 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
150154
peer_id: _,
151155
} = download_result;
152156
cx.send_block_for_processing(
157+
id,
153158
block_root,
154159
RpcBlock::new_without_blobs(Some(block_root), value),
155160
seen_timestamp,
156-
BlockProcessType::SingleBlock { id },
157161
)
158162
.map_err(LookupRequestError::SendFailed)
159163
}
@@ -181,7 +185,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
181185
peer_id: PeerId,
182186
downloaded_block_expected_blobs: Option<usize>,
183187
cx: &mut SyncNetworkContext<T>,
184-
) -> Result<bool, LookupRequestError> {
188+
) -> Result<LookupRequestResult, LookupRequestError> {
185189
cx.blob_lookup_request(
186190
id,
187191
peer_id,
@@ -202,13 +206,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
202206
seen_timestamp,
203207
peer_id: _,
204208
} = download_result;
205-
cx.send_blobs_for_processing(
206-
block_root,
207-
value,
208-
seen_timestamp,
209-
BlockProcessType::SingleBlob { id },
210-
)
211-
.map_err(LookupRequestError::SendFailed)
209+
cx.send_blobs_for_processing(id, block_root, value, seen_timestamp)
210+
.map_err(LookupRequestError::SendFailed)
212211
}
213212

214213
fn response_type() -> ResponseType {

0 commit comments

Comments
 (0)