Skip to content

Commit a64cee3

Browse files
committed
Merge remote-tracking branch 'origin/stable' into unstable
2 parents 784ef5f + 9e12c21 commit a64cee3

File tree

18 files changed

+294
-200
lines changed

18 files changed

+294
-200
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "beacon_node"
3-
version = "5.2.0"
3+
version = "5.2.1"
44
authors = [
55
"Paul Hauner <[email protected]>",
66
"Age Manning <[email protected]",

beacon_node/lighthouse_network/src/rpc/handler.rs

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -352,37 +352,6 @@ where
352352
!matches!(self.state, HandlerState::Deactivated)
353353
}
354354

355-
// NOTE: This function gets polled to completion upon a connection close.
356-
fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
357-
// Inform the network behaviour of any failed requests
358-
359-
while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() {
360-
let outbound_info = self
361-
.outbound_substreams
362-
.remove(&substream_id)
363-
.expect("The value must exist for a key");
364-
// If the state of the connection is closing, we do not need to report this case to
365-
// the behaviour, as the connection has just closed non-gracefully
366-
if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) {
367-
continue;
368-
}
369-
370-
// Register this request as an RPC Error
371-
return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound {
372-
error: RPCError::Disconnected,
373-
proto: outbound_info.proto,
374-
id: outbound_info.req_id,
375-
})));
376-
}
377-
378-
// Also handle any events that are awaiting to be sent to the behaviour
379-
if !self.events_out.is_empty() {
380-
return Poll::Ready(Some(self.events_out.remove(0)));
381-
}
382-
383-
Poll::Ready(None)
384-
}
385-
386355
fn poll(
387356
&mut self,
388357
cx: &mut Context<'_>,

beacon_node/lighthouse_network/tests/rpc_tests.rs

Lines changed: 1 addition & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
mod common;
44

55
use common::Protocol;
6-
use lighthouse_network::rpc::{methods::*, RPCError};
6+
use lighthouse_network::rpc::methods::*;
77
use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response};
88
use slog::{debug, warn, Level};
99
use ssz::Encode;
@@ -1012,98 +1012,6 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
10121012
})
10131013
}
10141014

1015-
#[test]
1016-
fn test_disconnect_triggers_rpc_error() {
1017-
// set up the logging. The level and enabled logging or not
1018-
let log_level = Level::Debug;
1019-
let enable_logging = false;
1020-
1021-
let log = common::build_log(log_level, enable_logging);
1022-
let spec = E::default_spec();
1023-
1024-
let rt = Arc::new(Runtime::new().unwrap());
1025-
// get sender/receiver
1026-
rt.block_on(async {
1027-
let (mut sender, mut receiver) = common::build_node_pair(
1028-
Arc::downgrade(&rt),
1029-
&log,
1030-
ForkName::Base,
1031-
&spec,
1032-
Protocol::Tcp,
1033-
)
1034-
.await;
1035-
1036-
// BlocksByRoot Request
1037-
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new(
1038-
// Must have at least one root for the request to create a stream
1039-
vec![Hash256::from_low_u64_be(0)],
1040-
&spec,
1041-
));
1042-
1043-
// build the sender future
1044-
let sender_future = async {
1045-
loop {
1046-
match sender.next_event().await {
1047-
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
1048-
// Send a STATUS message
1049-
debug!(log, "Sending RPC");
1050-
sender
1051-
.send_request(peer_id, 42, rpc_request.clone())
1052-
.unwrap();
1053-
}
1054-
NetworkEvent::RPCFailed { error, id: 42, .. } => match error {
1055-
RPCError::Disconnected => return,
1056-
other => panic!("received unexpected error {:?}", other),
1057-
},
1058-
other => {
1059-
warn!(log, "Ignoring other event {:?}", other);
1060-
}
1061-
}
1062-
}
1063-
};
1064-
1065-
// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
1066-
// messages
1067-
let mut sending_peer = None;
1068-
let receiver_future = async {
1069-
loop {
1070-
// this future either drives the sending/receiving or times out allowing messages to be
1071-
// sent in the timeout
1072-
match futures::future::select(
1073-
Box::pin(receiver.next_event()),
1074-
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
1075-
)
1076-
.await
1077-
{
1078-
futures::future::Either::Left((ev, _)) => match ev {
1079-
NetworkEvent::RequestReceived { peer_id, .. } => {
1080-
sending_peer = Some(peer_id);
1081-
}
1082-
other => {
1083-
warn!(log, "Ignoring other event {:?}", other);
1084-
}
1085-
},
1086-
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
1087-
}
1088-
1089-
// if we need to send messages send them here. This will happen after a delay
1090-
if let Some(peer_id) = sending_peer.take() {
1091-
warn!(log, "Receiver got request, disconnecting peer");
1092-
receiver.__hard_disconnect_testing_only(peer_id);
1093-
}
1094-
}
1095-
};
1096-
1097-
tokio::select! {
1098-
_ = sender_future => {}
1099-
_ = receiver_future => {}
1100-
_ = sleep(Duration::from_secs(30)) => {
1101-
panic!("Future timed out");
1102-
}
1103-
}
1104-
})
1105-
}
1106-
11071015
/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC
11081016
/// Goodbye message.
11091017
fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {

beacon_node/network/src/sync/backfill_sync/mod.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,15 +307,49 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
307307
/// A peer has disconnected.
308308
/// If the peer has active batches, those are considered failed and re-requested.
309309
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
310-
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
310+
pub fn peer_disconnected(
311+
&mut self,
312+
peer_id: &PeerId,
313+
network: &mut SyncNetworkContext<T>,
314+
) -> Result<(), BackFillError> {
311315
if matches!(
312316
self.state(),
313317
BackFillState::Failed | BackFillState::NotRequired
314318
) {
315319
return Ok(());
316320
}
317321

318-
self.active_requests.remove(peer_id);
322+
if let Some(batch_ids) = self.active_requests.remove(peer_id) {
323+
// fail the batches.
324+
for id in batch_ids {
325+
if let Some(batch) = self.batches.get_mut(&id) {
326+
match batch.download_failed(false) {
327+
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
328+
self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
329+
}
330+
Ok(BatchOperationOutcome::Continue) => {}
331+
Err(e) => {
332+
self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
333+
}
334+
}
335+
// If we have run out of peers in which to retry this batch, the backfill state
336+
// transitions to a paused state.
337+
// We still need to reset the state for all the affected batches, so we should not
338+
// short circuit early.
339+
if self.retry_batch_download(network, id).is_err() {
340+
debug!(
341+
self.log,
342+
"Batch could not be retried";
343+
"batch_id" => id,
344+
"error" => "no synced peers"
345+
);
346+
}
347+
} else {
348+
debug!(self.log, "Batch not found while removing peer";
349+
"peer" => %peer_id, "batch" => id)
350+
}
351+
}
352+
}
319353

320354
// Remove the peer from the participation list
321355
self.participating_peers.remove(peer_id);

beacon_node/network/src/sync/block_lookups/mod.rs

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,25 @@
1+
//! Implements block lookup sync.
2+
//!
3+
//! Block lookup sync is triggered when a peer claims to have imported a block we don't know about.
4+
//! For example, a peer attesting to a head block root that is not in our fork-choice. Lookup sync
5+
//! is recursive in nature, as we may discover that this attested head block root has a parent that
6+
//! is also unknown to us.
7+
//!
8+
//! Block lookup is implemented as an event-driven state machine. It sends events to the network and
9+
//! beacon processor, and expects some set of events back. A discrepancy in the expected event API
10+
//! will result in lookups getting "stuck". A lookup becomes stuck when there is no future event
11+
//! that will trigger the lookup to make progress. There's a fallback mechanism that drops lookups
12+
//! that live for too long, logging the line "Notify the devs a sync lookup is stuck".
13+
//!
14+
//! The expected event API is documented in the code paths that are making assumptions with the
15+
//! comment prefix "Lookup sync event safety:"
16+
//!
17+
//! Block lookup sync attempts to not re-download or re-process data that we already have. Block
18+
//! components are cached temporarily in multiple places before they are imported into fork-choice.
19+
//! Therefore, block lookup sync must peek these caches correctly to decide when to skip a download
20+
//! or consider a lookup complete. These caches are read from the `SyncNetworkContext` and its state
21+
//! returned to this module as `LookupRequestResult` variants.
22+
123
use self::parent_chain::{compute_parent_chains, NodeChain};
224
pub use self::single_block_lookup::DownloadResult;
325
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
@@ -277,7 +299,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
277299
}
278300
}
279301

280-
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers) {
302+
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) {
281303
warn!(self.log, "Error adding peers to ancestor lookup"; "error" => ?e);
282304
}
283305

@@ -426,21 +448,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
426448
/* Error responses */
427449

428450
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
429-
self.single_block_lookups.retain(|_, lookup| {
451+
for (_, lookup) in self.single_block_lookups.iter_mut() {
430452
lookup.remove_peer(peer_id);
431-
432-
// Note: this condition should be removed in the future. It's not strictly necessary to drop a
433-
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
434-
if lookup.has_no_peers() {
435-
debug!(self.log,
436-
"Dropping single lookup after peer disconnection";
437-
"block_root" => ?lookup.block_root()
438-
);
439-
false
440-
} else {
441-
true
442-
}
443-
});
453+
}
444454
}
445455

446456
/* Processing responses */
@@ -803,12 +813,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
803813
};
804814

805815
if stuck_lookup.id == ancestor_stuck_lookup.id {
806-
warn!(self.log, "Notify the devs, a sync lookup is stuck";
816+
warn!(self.log, "Notify the devs a sync lookup is stuck";
807817
"block_root" => ?stuck_lookup.block_root(),
808818
"lookup" => ?stuck_lookup,
809819
);
810820
} else {
811-
warn!(self.log, "Notify the devs, a sync lookup is stuck";
821+
warn!(self.log, "Notify the devs a sync lookup is stuck";
812822
"block_root" => ?stuck_lookup.block_root(),
813823
"lookup" => ?stuck_lookup,
814824
"ancestor_block_root" => ?ancestor_stuck_lookup.block_root(),
@@ -850,37 +860,43 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
850860
&mut self,
851861
lookup_id: SingleLookupId,
852862
peers: &[PeerId],
863+
cx: &mut SyncNetworkContext<T>,
853864
) -> Result<(), String> {
854865
let lookup = self
855866
.single_block_lookups
856867
.get_mut(&lookup_id)
857868
.ok_or(format!("Unknown lookup for id {lookup_id}"))?;
858869

870+
let mut added_some_peer = false;
859871
for peer in peers {
860872
if lookup.add_peer(*peer) {
873+
added_some_peer = true;
861874
debug!(self.log, "Adding peer to existing single block lookup";
862875
"block_root" => ?lookup.block_root(),
863876
"peer" => ?peer
864877
);
865878
}
866879
}
867880

868-
// We may choose to attempt to continue a lookup here. It is possible that a lookup had zero
869-
// peers and after adding this set of peers it can make progress again. Note that this
870-
// recursive function iterates from child to parent, so continuing the child first is weird.
871-
// However, we choose to not attempt to continue the lookup for simplicity. It's not
872-
// strictly required and just and optimization for a rare corner case.
873-
874881
if let Some(parent_root) = lookup.awaiting_parent() {
875882
if let Some((&child_id, _)) = self
876883
.single_block_lookups
877884
.iter()
878885
.find(|(_, l)| l.block_root() == parent_root)
879886
{
880-
self.add_peers_to_lookup_and_ancestors(child_id, peers)
887+
self.add_peers_to_lookup_and_ancestors(child_id, peers, cx)
881888
} else {
882889
Err(format!("Lookup references unknown parent {parent_root:?}"))
883890
}
891+
} else if added_some_peer {
892+
// If this lookup is not awaiting a parent and we added at least one peer, attempt to
893+
// make progress. It is possible that a lookup is created with zero peers, attempted to
894+
// make progress, and then receives peers. After that time the lookup will never be
895+
// pruned with `drop_lookups_without_peers` because it has peers. This is rare corner
896+
// case, but it can result in stuck lookups.
897+
let result = lookup.continue_requests(cx);
898+
self.on_lookup_result(lookup_id, result, "add_peers", cx);
899+
Ok(())
884900
} else {
885901
Ok(())
886902
}

0 commit comments

Comments
 (0)