Skip to content

Commit 3d99ce2

Browse files
committed
Correct a race condition when dialing peers (#4056)
There is a race condition which occurs when multiple discovery queries return at almost the exact same time and they independently contain a useful peer we would like to connect to. The condition can occur that we can add the same peer to the dial queue, before we get a chance to process the queue. This ends up displaying an error to the user: ``` ERRO Dialing an already dialing peer ``` Although this error is harmless it's not ideal. There are two solutions to resolving this: 1. As we decide to dial the peer, we change the state in the peer-db to dialing (before we add it to the queue) which would prevent other requests from adding to the queue. 2. We prevent duplicates in the dial queue This PR has opted for 2. because 1. will complicate the code in that we are changing states in non-intuitive places. Although this technically adds a very slight performance cost, its probably a cleaner solution as we can keep the state-changing logic in one place.
1 parent 1ec3041 commit 3d99ce2

File tree

8 files changed

+41
-12
lines changed

8 files changed

+41
-12
lines changed

Cargo.lock

Lines changed: 14 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM rust:1.65.0-bullseye AS builder
1+
FROM rust:1.66.0-bullseye AS builder
22
RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev protobuf-compiler
33
COPY . lighthouse
44
ARG FEATURES

beacon_node/lighthouse_network/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ strum = { version = "0.24.0", features = ["derive"] }
4242
superstruct = "0.5.0"
4343
prometheus-client = "0.18.0"
4444
unused_port = { path = "../../common/unused_port" }
45-
delay_map = "0.1.1"
45+
delay_map = "0.3.0"
4646
void = "1"
4747

4848
[dependencies.libp2p]

beacon_node/lighthouse_network/src/peer_manager/mod.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
1313
use rand::seq::SliceRandom;
1414
use slog::{debug, error, trace, warn};
1515
use smallvec::SmallVec;
16-
use std::collections::VecDeque;
16+
use std::collections::BTreeMap;
1717
use std::{
1818
sync::Arc,
1919
time::{Duration, Instant},
@@ -77,7 +77,7 @@ pub struct PeerManager<TSpec: EthSpec> {
7777
/// The target number of peers we would like to connect to.
7878
target_peers: usize,
7979
/// Peers queued to be dialed.
80-
peers_to_dial: VecDeque<(PeerId, Option<Enr>)>,
80+
peers_to_dial: BTreeMap<PeerId, Option<Enr>>,
8181
/// The number of temporarily banned peers. This is used to prevent instantaneous
8282
/// reconnection.
8383
// NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A
@@ -308,7 +308,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
308308
/// proves resource constraining, we should switch to multiaddr dialling here.
309309
#[allow(clippy::mutable_key_type)]
310310
pub fn peers_discovered(&mut self, results: HashMap<PeerId, Option<Instant>>) -> Vec<PeerId> {
311-
let mut to_dial_peers = Vec::new();
311+
let mut to_dial_peers = Vec::with_capacity(4);
312312

313313
let connected_or_dialing = self.network_globals.connected_or_dialing_peers();
314314
for (peer_id, min_ttl) in results {
@@ -398,7 +398,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
398398

399399
// A peer is being dialed.
400400
pub fn dial_peer(&mut self, peer_id: &PeerId, enr: Option<Enr>) {
401-
self.peers_to_dial.push_back((*peer_id, enr));
401+
self.peers_to_dial.insert(*peer_id, enr);
402402
}
403403

404404
/// Reports if a peer is banned or not.
@@ -1185,6 +1185,18 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
11851185

11861186
// Unban any peers that have served their temporary ban timeout
11871187
self.unban_temporary_banned_peers();
1188+
1189+
// Maintains memory by shrinking mappings
1190+
self.shrink_mappings();
1191+
}
1192+
1193+
// Reduce memory footprint by routinely shrinking associating mappings.
1194+
fn shrink_mappings(&mut self) {
1195+
self.inbound_ping_peers.shrink_to(5);
1196+
self.outbound_ping_peers.shrink_to(5);
1197+
self.status_peers.shrink_to(5);
1198+
self.temporary_banned_peers.shrink_to_fit();
1199+
self.sync_committee_subnets.shrink_to_fit();
11881200
}
11891201

11901202
// Update metrics related to peer scoring.

beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
8989
self.events.shrink_to_fit();
9090
}
9191

92-
if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_front() {
92+
if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_first() {
9393
self.inject_peer_connection(&peer_id, ConnectingType::Dialing, maybe_enr);
9494
let handler = self.new_handler();
9595
return Poll::Ready(NetworkBehaviourAction::Dial {

beacon_node/network/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ if-addrs = "0.6.4"
4343
strum = "0.24.0"
4444
tokio-util = { version = "0.6.3", features = ["time"] }
4545
derivative = "2.2.0"
46-
delay_map = "0.1.1"
46+
delay_map = "0.3.0"
4747
ethereum-types = { version = "0.14.1", optional = true }
4848
operation_pool = { path = "../operation_pool" }
4949
execution_layer = { path = "../execution_layer" }

common/lru_cache/src/time.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,12 @@ where
160160
self.map.contains(key)
161161
}
162162

163+
/// Shrink the mappings to fit the current size.
164+
pub fn shrink_to_fit(&mut self) {
165+
self.map.shrink_to_fit();
166+
self.list.shrink_to_fit();
167+
}
168+
163169
#[cfg(test)]
164170
#[track_caller]
165171
fn check_invariant(&self) {

lighthouse/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ version = "3.5.1"
44
authors = ["Sigma Prime <[email protected]>"]
55
edition = "2021"
66
autotests = false
7-
rust-version = "1.65"
7+
rust-version = "1.66"
88

99
[features]
1010
default = ["slasher-mdbx"]

0 commit comments

Comments
 (0)