Skip to content

Commit 03c0362

Browse files
AgeManningdanielrachi1
authored andcommitted
Add gossipsub as a Lighthouse behaviour (sigp#5066)
* Move gossipsub as a lighthouse behaviour * Update dependencies, pin to corrected libp2p version * Merge latest unstable * Fix test * Remove unused dep * Fix cargo.lock * Re-order behaviour, pin upstream libp2p * Pin discv5 to latest version
1 parent 4fcfec5 commit 03c0362

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+17720
-182
lines changed

Cargo.lock

Lines changed: 460 additions & 154 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ criterion = "0.3"
105105
delay_map = "0.3"
106106
derivative = "2"
107107
dirs = "3"
108-
discv5 = { git="https://github.com/sigp/discv5", rev="e30a2c31b7ac0c57876458b971164654dfa4513b", features = ["libp2p"] }
108+
either = "1.9"
109+
discv5 = { version = "0.4.1", features = ["libp2p"] }
109110
env_logger = "0.9"
110111
error-chain = "0.12"
111112
ethereum-types = "0.14"
@@ -160,6 +161,7 @@ tempfile = "3"
160161
tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal"] }
161162
tokio-stream = { version = "0.1", features = ["sync"] }
162163
tokio-util = { version = "0.6", features = ["codec", "compat", "time"] }
164+
tracing = "0.1.40"
163165
tracing-appender = "0.2"
164166
tracing-core = "0.1"
165167
tracing-log = "0.2"

beacon_node/lighthouse_network/Cargo.toml

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,29 @@ superstruct = { workspace = true }
4242
prometheus-client = "0.22.0"
4343
unused_port = { workspace = true }
4444
delay_map = { workspace = true }
45-
void = "1"
46-
libp2p-mplex = { git = "https://github.com/sigp/rust-libp2p/", rev = "cfa3275ca17e502799ed56e555b6c0611752e369" }
45+
tracing = { workspace = true }
46+
byteorder = { workspace = true }
47+
bytes = { workspace = true }
48+
either = { workspace = true }
49+
50+
# Local dependencies
51+
futures-ticker = "0.0.3"
52+
futures-timer = "3.0.2"
53+
getrandom = "0.2.11"
54+
hex_fmt = "0.3.0"
55+
instant = "0.1.12"
56+
quick-protobuf = "0.8"
57+
void = "1.0.2"
58+
async-channel = "1.9.0"
59+
asynchronous-codec = "0.7.0"
60+
base64 = "0.21.5"
61+
libp2p-mplex = "0.41"
62+
quick-protobuf-codec = "0.3"
4763

4864
[dependencies.libp2p]
49-
git = "https://github.com/sigp/rust-libp2p/"
50-
rev = "cfa3275ca17e502799ed56e555b6c0611752e369"
65+
version = "0.53"
5166
default-features = false
52-
features = ["identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa", "metrics", "quic"]
67+
features = ["identify", "yamux", "noise", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa", "metrics", "quic"]
5368

5469
[dev-dependencies]
5570
slog-term = { workspace = true }
@@ -58,6 +73,7 @@ tempfile = { workspace = true }
5873
exit-future = { workspace = true }
5974
quickcheck = { workspace = true }
6075
quickcheck_macros = { workspace = true }
76+
async-std = { version = "1.6.3", features = ["unstable"] }
6177

6278
[features]
6379
libp2p-websocket = []

beacon_node/lighthouse_network/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
use crate::gossipsub;
12
use crate::listen_addr::{ListenAddr, ListenAddress};
23
use crate::rpc::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
34
use crate::types::GossipKind;
45
use crate::{Enr, PeerIdSerialized};
56
use directory::{
67
DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_NETWORK, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR,
78
};
8-
use libp2p::gossipsub;
99
use libp2p::Multiaddr;
1010
use serde::{Deserialize, Serialize};
1111
use sha2::{Digest, Sha256};
@@ -158,7 +158,7 @@ pub struct Config {
158158
/// Configuration for the inbound rate limiter (requests received by this node).
159159
pub inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,
160160

161-
/// Whether to disable logging duplicate gossip messages as WARN. If set to true, duplicate
161+
/// Whether to disable logging duplicate gossip messages as WARN. If set to true, duplicate
162162
/// errors will be logged at DEBUG level.
163163
pub disable_duplicate_warn_logs: bool,
164164
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// Copyright 2020 Sigma Prime Pty Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
//! Data structure for efficiently storing known back-off's when pruning peers.
22+
use crate::gossipsub::topic::TopicHash;
23+
use instant::Instant;
24+
use libp2p::identity::PeerId;
25+
use std::collections::{
26+
hash_map::{Entry, HashMap},
27+
HashSet,
28+
};
29+
use std::time::Duration;
30+
31+
#[derive(Copy, Clone)]
32+
struct HeartbeatIndex(usize);
33+
34+
/// Stores backoffs in an efficient manner.
35+
pub(crate) struct BackoffStorage {
36+
/// Stores backoffs and the index in backoffs_by_heartbeat per peer per topic.
37+
backoffs: HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
38+
/// Stores peer topic pairs per heartbeat (this is cyclic the current index is
39+
/// heartbeat_index).
40+
backoffs_by_heartbeat: Vec<HashSet<(TopicHash, PeerId)>>,
41+
/// The index in the backoffs_by_heartbeat vector corresponding to the current heartbeat.
42+
heartbeat_index: HeartbeatIndex,
43+
/// The heartbeat interval duration from the config.
44+
heartbeat_interval: Duration,
45+
/// Backoff slack from the config.
46+
backoff_slack: u32,
47+
}
48+
49+
impl BackoffStorage {
50+
fn heartbeats(d: &Duration, heartbeat_interval: &Duration) -> usize {
51+
((d.as_nanos() + heartbeat_interval.as_nanos() - 1) / heartbeat_interval.as_nanos())
52+
as usize
53+
}
54+
55+
pub(crate) fn new(
56+
prune_backoff: &Duration,
57+
heartbeat_interval: Duration,
58+
backoff_slack: u32,
59+
) -> BackoffStorage {
60+
// We add one additional slot for partial heartbeat
61+
let max_heartbeats =
62+
Self::heartbeats(prune_backoff, &heartbeat_interval) + backoff_slack as usize + 1;
63+
BackoffStorage {
64+
backoffs: HashMap::new(),
65+
backoffs_by_heartbeat: vec![HashSet::new(); max_heartbeats],
66+
heartbeat_index: HeartbeatIndex(0),
67+
heartbeat_interval,
68+
backoff_slack,
69+
}
70+
}
71+
72+
/// Updates the backoff for a peer (if there is already a more restrictive backoff then this call
73+
/// doesn't change anything).
74+
pub(crate) fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
75+
let instant = Instant::now() + time;
76+
let insert_into_backoffs_by_heartbeat =
77+
|heartbeat_index: HeartbeatIndex,
78+
backoffs_by_heartbeat: &mut Vec<HashSet<_>>,
79+
heartbeat_interval,
80+
backoff_slack| {
81+
let pair = (topic.clone(), *peer);
82+
let index = (heartbeat_index.0
83+
+ Self::heartbeats(&time, heartbeat_interval)
84+
+ backoff_slack as usize)
85+
% backoffs_by_heartbeat.len();
86+
backoffs_by_heartbeat[index].insert(pair);
87+
HeartbeatIndex(index)
88+
};
89+
match self.backoffs.entry(topic.clone()).or_default().entry(*peer) {
90+
Entry::Occupied(mut o) => {
91+
let (backoff, index) = o.get();
92+
if backoff < &instant {
93+
let pair = (topic.clone(), *peer);
94+
if let Some(s) = self.backoffs_by_heartbeat.get_mut(index.0) {
95+
s.remove(&pair);
96+
}
97+
let index = insert_into_backoffs_by_heartbeat(
98+
self.heartbeat_index,
99+
&mut self.backoffs_by_heartbeat,
100+
&self.heartbeat_interval,
101+
self.backoff_slack,
102+
);
103+
o.insert((instant, index));
104+
}
105+
}
106+
Entry::Vacant(v) => {
107+
let index = insert_into_backoffs_by_heartbeat(
108+
self.heartbeat_index,
109+
&mut self.backoffs_by_heartbeat,
110+
&self.heartbeat_interval,
111+
self.backoff_slack,
112+
);
113+
v.insert((instant, index));
114+
}
115+
};
116+
}
117+
118+
/// Checks if a given peer is backoffed for the given topic. This method respects the
119+
/// configured BACKOFF_SLACK and may return true even if the backup is already over.
120+
/// It is guaranteed to return false if the backoff is not over and eventually if enough time
121+
/// passed true if the backoff is over.
122+
///
123+
/// This method should be used for deciding if we can already send a GRAFT to a previously
124+
/// backoffed peer.
125+
pub(crate) fn is_backoff_with_slack(&self, topic: &TopicHash, peer: &PeerId) -> bool {
126+
self.backoffs
127+
.get(topic)
128+
.map_or(false, |m| m.contains_key(peer))
129+
}
130+
131+
pub(crate) fn get_backoff_time(&self, topic: &TopicHash, peer: &PeerId) -> Option<Instant> {
132+
Self::get_backoff_time_from_backoffs(&self.backoffs, topic, peer)
133+
}
134+
135+
fn get_backoff_time_from_backoffs(
136+
backoffs: &HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
137+
topic: &TopicHash,
138+
peer: &PeerId,
139+
) -> Option<Instant> {
140+
backoffs
141+
.get(topic)
142+
.and_then(|m| m.get(peer).map(|(i, _)| *i))
143+
}
144+
145+
/// Applies a heartbeat. That should be called regularly in intervals of length
146+
/// `heartbeat_interval`.
147+
pub(crate) fn heartbeat(&mut self) {
148+
// Clean up backoffs_by_heartbeat
149+
if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index.0) {
150+
let backoffs = &mut self.backoffs;
151+
let slack = self.heartbeat_interval * self.backoff_slack;
152+
let now = Instant::now();
153+
s.retain(|(topic, peer)| {
154+
let keep = match Self::get_backoff_time_from_backoffs(backoffs, topic, peer) {
155+
Some(backoff_time) => backoff_time + slack > now,
156+
None => false,
157+
};
158+
if !keep {
159+
//remove from backoffs
160+
if let Entry::Occupied(mut m) = backoffs.entry(topic.clone()) {
161+
if m.get_mut().remove(peer).is_some() && m.get().is_empty() {
162+
m.remove();
163+
}
164+
}
165+
}
166+
167+
keep
168+
});
169+
}
170+
171+
// Increase heartbeat index
172+
self.heartbeat_index =
173+
HeartbeatIndex((self.heartbeat_index.0 + 1) % self.backoffs_by_heartbeat.len());
174+
}
175+
}

0 commit comments

Comments
 (0)