Skip to content

Commit 1254abe

Browse files
committed
Move gossipsub as a lighthouse behaviour
1 parent 00af017 commit 1254abe

40 files changed

+17677
-64
lines changed

Cargo.lock

Lines changed: 391 additions & 42 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ delay_map = "0.3"
106106
derivative = "2"
107107
dirs = "3"
108108
discv5 = { git="https://github.com/sigp/discv5", rev="dbb4a718cd32eaed8127c3c8241bfd0fde9eb908", features = ["libp2p"] }
109+
either = "1.9"
109110
env_logger = "0.9"
110111
error-chain = "0.12"
111112
ethereum-types = "0.14"
@@ -160,6 +161,7 @@ tempfile = "3"
160161
tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal"] }
161162
tokio-stream = { version = "0.1", features = ["sync"] }
162163
tokio-util = { version = "0.6", features = ["codec", "compat", "time"] }
164+
tracing = "0.1.40"
163165
tree_hash = "0.5"
164166
tree_hash_derive = "0.5"
165167
url = "2"
@@ -168,6 +170,19 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
168170
warp = { git = "https://github.com/seanmonstar/warp.git", default-features = false, features = ["tls"] }
169171
zeroize = { version = "1", features = ["zeroize_derive"] }
170172
zip = "0.6"
173+
futures-ticker = "0.0.3"
174+
futures-timer = "3.0.2"
175+
getrandom = "0.2.11"
176+
hex_fmt = "0.3.0"
177+
instant = "0.1.12"
178+
quick-protobuf = "0.8"
179+
void = "1.0.2"
180+
async-channel = "1.9.0"
181+
asynchronous-codec = "0.7.0"
182+
base64 = "0.21.5"
183+
quick-protobuf-codec = { git = "https://github.com/sigp/rust-libp2p/", rev = "b96b90894faab0a1eed78e1c82c6452138a3538a" }
184+
libp2p-mplex = { git = "https://github.com/sigp/rust-libp2p/", rev = "b96b90894faab0a1eed78e1c82c6452138a3538a" }
185+
libp2p = { git = "https://github.com/sigp/rust-libp2p/", rev = "b96b90894faab0a1eed78e1c82c6452138a3538a", default-features = false, features = ["identify", "yamux", "noise", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa", "metrics", "quic"] }
171186

172187
# Local crates.
173188
account_utils = { path = "common/account_utils" }

beacon_node/lighthouse_network/Cargo.toml

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,23 @@ superstruct = { workspace = true }
4242
prometheus-client = "0.22.0"
4343
unused_port = { workspace = true }
4444
delay_map = { workspace = true }
45-
void = "1"
46-
libp2p-mplex = { git = "https://github.com/sigp/rust-libp2p/", rev = "b96b90894faab0a1eed78e1c82c6452138a3538a" }
47-
48-
[dependencies.libp2p]
49-
git = "https://github.com/sigp/rust-libp2p/"
50-
rev = "b96b90894faab0a1eed78e1c82c6452138a3538a"
51-
default-features = false
52-
features = ["identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa", "metrics", "quic"]
45+
tracing = { workspace = true }
46+
byteorder = { workspace = true }
47+
bytes = { workspace = true }
48+
either = { workspace = true }
49+
futures-ticker = { workspace = true }
50+
futures-timer = { workspace = true }
51+
getrandom = { workspace = true }
52+
hex_fmt = { workspace = true }
53+
instant = { workspace = true }
54+
quick-protobuf = { workspace = true }
55+
quick-protobuf-codec = { workspace = true }
56+
void = { workspace = true }
57+
async-channel = { workspace = true }
58+
asynchronous-codec = { workspace = true }
59+
base64 = { workspace = true }
60+
libp2p-mplex = { workspace = true }
61+
libp2p = {workspace = true }
5362

5463
[dev-dependencies]
5564
slog-term = { workspace = true }
@@ -58,6 +67,8 @@ tempfile = { workspace = true }
5867
exit-future = { workspace = true }
5968
quickcheck = { workspace = true }
6069
quickcheck_macros = { workspace = true }
70+
async-std = { version = "1.6.3", features = ["unstable"] }
71+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
6172

6273
[features]
6374
libp2p-websocket = []

beacon_node/lighthouse_network/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
use crate::gossipsub;
12
use crate::listen_addr::{ListenAddr, ListenAddress};
23
use crate::rpc::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
34
use crate::types::GossipKind;
45
use crate::{Enr, PeerIdSerialized};
56
use directory::{
67
DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_NETWORK, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR,
78
};
8-
use libp2p::gossipsub;
99
use libp2p::Multiaddr;
1010
use serde::{Deserialize, Serialize};
1111
use sha2::{Digest, Sha256};
@@ -158,7 +158,7 @@ pub struct Config {
158158
/// Configuration for the inbound rate limiter (requests received by this node).
159159
pub inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,
160160

161-
/// Whether to disable logging duplicate gossip messages as WARN. If set to true, duplicate
161+
/// Whether to disable logging duplicate gossip messages as WARN. If set to true, duplicate
162162
/// errors will be logged at DEBUG level.
163163
pub disable_duplicate_warn_logs: bool,
164164
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// Copyright 2020 Sigma Prime Pty Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
//! Data structure for efficiently storing known back-off's when pruning peers.
22+
use crate::gossipsub::topic::TopicHash;
23+
use instant::Instant;
24+
use libp2p::identity::PeerId;
25+
use std::collections::{
26+
hash_map::{Entry, HashMap},
27+
HashSet,
28+
};
29+
use std::time::Duration;
30+
31+
#[derive(Copy, Clone)]
32+
struct HeartbeatIndex(usize);
33+
34+
/// Stores backoffs in an efficient manner.
35+
pub(crate) struct BackoffStorage {
36+
/// Stores backoffs and the index in backoffs_by_heartbeat per peer per topic.
37+
backoffs: HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
38+
/// Stores peer topic pairs per heartbeat (this is cyclic the current index is
39+
/// heartbeat_index).
40+
backoffs_by_heartbeat: Vec<HashSet<(TopicHash, PeerId)>>,
41+
/// The index in the backoffs_by_heartbeat vector corresponding to the current heartbeat.
42+
heartbeat_index: HeartbeatIndex,
43+
/// The heartbeat interval duration from the config.
44+
heartbeat_interval: Duration,
45+
/// Backoff slack from the config.
46+
backoff_slack: u32,
47+
}
48+
49+
impl BackoffStorage {
50+
fn heartbeats(d: &Duration, heartbeat_interval: &Duration) -> usize {
51+
((d.as_nanos() + heartbeat_interval.as_nanos() - 1) / heartbeat_interval.as_nanos())
52+
as usize
53+
}
54+
55+
pub(crate) fn new(
56+
prune_backoff: &Duration,
57+
heartbeat_interval: Duration,
58+
backoff_slack: u32,
59+
) -> BackoffStorage {
60+
// We add one additional slot for partial heartbeat
61+
let max_heartbeats =
62+
Self::heartbeats(prune_backoff, &heartbeat_interval) + backoff_slack as usize + 1;
63+
BackoffStorage {
64+
backoffs: HashMap::new(),
65+
backoffs_by_heartbeat: vec![HashSet::new(); max_heartbeats],
66+
heartbeat_index: HeartbeatIndex(0),
67+
heartbeat_interval,
68+
backoff_slack,
69+
}
70+
}
71+
72+
/// Updates the backoff for a peer (if there is already a more restrictive backoff then this call
73+
/// doesn't change anything).
74+
pub(crate) fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
75+
let instant = Instant::now() + time;
76+
let insert_into_backoffs_by_heartbeat =
77+
|heartbeat_index: HeartbeatIndex,
78+
backoffs_by_heartbeat: &mut Vec<HashSet<_>>,
79+
heartbeat_interval,
80+
backoff_slack| {
81+
let pair = (topic.clone(), *peer);
82+
let index = (heartbeat_index.0
83+
+ Self::heartbeats(&time, heartbeat_interval)
84+
+ backoff_slack as usize)
85+
% backoffs_by_heartbeat.len();
86+
backoffs_by_heartbeat[index].insert(pair);
87+
HeartbeatIndex(index)
88+
};
89+
match self.backoffs.entry(topic.clone()).or_default().entry(*peer) {
90+
Entry::Occupied(mut o) => {
91+
let (backoff, index) = o.get();
92+
if backoff < &instant {
93+
let pair = (topic.clone(), *peer);
94+
if let Some(s) = self.backoffs_by_heartbeat.get_mut(index.0) {
95+
s.remove(&pair);
96+
}
97+
let index = insert_into_backoffs_by_heartbeat(
98+
self.heartbeat_index,
99+
&mut self.backoffs_by_heartbeat,
100+
&self.heartbeat_interval,
101+
self.backoff_slack,
102+
);
103+
o.insert((instant, index));
104+
}
105+
}
106+
Entry::Vacant(v) => {
107+
let index = insert_into_backoffs_by_heartbeat(
108+
self.heartbeat_index,
109+
&mut self.backoffs_by_heartbeat,
110+
&self.heartbeat_interval,
111+
self.backoff_slack,
112+
);
113+
v.insert((instant, index));
114+
}
115+
};
116+
}
117+
118+
/// Checks if a given peer is backoffed for the given topic. This method respects the
119+
/// configured BACKOFF_SLACK and may return true even if the backup is already over.
120+
/// It is guaranteed to return false if the backoff is not over and eventually if enough time
121+
/// passed true if the backoff is over.
122+
///
123+
/// This method should be used for deciding if we can already send a GRAFT to a previously
124+
/// backoffed peer.
125+
pub(crate) fn is_backoff_with_slack(&self, topic: &TopicHash, peer: &PeerId) -> bool {
126+
self.backoffs
127+
.get(topic)
128+
.map_or(false, |m| m.contains_key(peer))
129+
}
130+
131+
pub(crate) fn get_backoff_time(&self, topic: &TopicHash, peer: &PeerId) -> Option<Instant> {
132+
Self::get_backoff_time_from_backoffs(&self.backoffs, topic, peer)
133+
}
134+
135+
fn get_backoff_time_from_backoffs(
136+
backoffs: &HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
137+
topic: &TopicHash,
138+
peer: &PeerId,
139+
) -> Option<Instant> {
140+
backoffs
141+
.get(topic)
142+
.and_then(|m| m.get(peer).map(|(i, _)| *i))
143+
}
144+
145+
/// Applies a heartbeat. That should be called regularly in intervals of length
146+
/// `heartbeat_interval`.
147+
pub(crate) fn heartbeat(&mut self) {
148+
// Clean up backoffs_by_heartbeat
149+
if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index.0) {
150+
let backoffs = &mut self.backoffs;
151+
let slack = self.heartbeat_interval * self.backoff_slack;
152+
let now = Instant::now();
153+
s.retain(|(topic, peer)| {
154+
let keep = match Self::get_backoff_time_from_backoffs(backoffs, topic, peer) {
155+
Some(backoff_time) => backoff_time + slack > now,
156+
None => false,
157+
};
158+
if !keep {
159+
//remove from backoffs
160+
if let Entry::Occupied(mut m) = backoffs.entry(topic.clone()) {
161+
if m.get_mut().remove(peer).is_some() && m.get().is_empty() {
162+
m.remove();
163+
}
164+
}
165+
}
166+
167+
keep
168+
});
169+
}
170+
171+
// Increase heartbeat index
172+
self.heartbeat_index =
173+
HeartbeatIndex((self.heartbeat_index.0 + 1) % self.backoffs_by_heartbeat.len());
174+
}
175+
}

0 commit comments

Comments
 (0)