Skip to content

Commit ba8bcf4

Browse files
committed
Remove deficit gossipsub scoring during topic transition (#4486)
## Issue Addressed This PR closes #3237 ## Proposed Changes Remove topic weight of old topics when the fork happens. ## Additional Info - Divided `NetworkService::start()` into `NetworkService::build()` and `NetworkService::start()` for ease of testing.
1 parent 6ec649a commit ba8bcf4

File tree

3 files changed

+204
-7
lines changed

3 files changed

+204
-7
lines changed

beacon_node/lighthouse_network/src/service/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettin
2626
use libp2p::bandwidth::BandwidthSinks;
2727
use libp2p::gossipsub::{
2828
self, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
29+
TopicScoreParams,
2930
};
3031
use libp2p::identify;
3132
use libp2p::multiaddr::{Multiaddr, Protocol as MProtocol};
@@ -618,6 +619,38 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
618619
}
619620
}
620621

622+
/// Remove topic weight from all topics that don't have the given fork digest.
623+
pub fn remove_topic_weight_except(&mut self, except: [u8; 4]) {
624+
let new_param = TopicScoreParams {
625+
topic_weight: 0.0,
626+
..Default::default()
627+
};
628+
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
629+
for topic in subscriptions
630+
.iter()
631+
.filter(|topic| topic.fork_digest != except)
632+
{
633+
let libp2p_topic: Topic = topic.clone().into();
634+
match self
635+
.gossipsub_mut()
636+
.set_topic_params(libp2p_topic, new_param.clone())
637+
{
638+
Ok(_) => debug!(self.log, "Removed topic weight"; "topic" => %topic),
639+
Err(e) => {
640+
warn!(self.log, "Failed to remove topic weight"; "topic" => %topic, "error" => e)
641+
}
642+
}
643+
}
644+
}
645+
646+
/// Returns the scoring parameters for a topic if set.
647+
pub fn get_topic_params(&self, topic: GossipTopic) -> Option<&TopicScoreParams> {
648+
self.swarm
649+
.behaviour()
650+
.gossipsub
651+
.get_topic_params(&topic.into())
652+
}
653+
621654
/// Subscribes to a gossipsub topic.
622655
///
623656
/// Returns `true` if the subscription was successful and `false` otherwise.

beacon_node/network/src/service.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,15 +215,18 @@ pub struct NetworkService<T: BeaconChainTypes> {
215215
}
216216

217217
impl<T: BeaconChainTypes> NetworkService<T> {
218-
#[allow(clippy::type_complexity)]
219-
pub async fn start(
218+
async fn build(
220219
beacon_chain: Arc<BeaconChain<T>>,
221220
config: &NetworkConfig,
222221
executor: task_executor::TaskExecutor,
223222
gossipsub_registry: Option<&'_ mut Registry>,
224223
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
225224
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
226-
) -> error::Result<(Arc<NetworkGlobals<T::EthSpec>>, NetworkSenders<T::EthSpec>)> {
225+
) -> error::Result<(
226+
NetworkService<T>,
227+
Arc<NetworkGlobals<T::EthSpec>>,
228+
NetworkSenders<T::EthSpec>,
229+
)> {
227230
let network_log = executor.log().clone();
228231
// build the channels for external comms
229232
let (network_senders, network_recievers) = NetworkSenders::new();
@@ -369,6 +372,28 @@ impl<T: BeaconChainTypes> NetworkService<T> {
369372
enable_light_client_server: config.enable_light_client_server,
370373
};
371374

375+
Ok((network_service, network_globals, network_senders))
376+
}
377+
378+
#[allow(clippy::type_complexity)]
379+
pub async fn start(
380+
beacon_chain: Arc<BeaconChain<T>>,
381+
config: &NetworkConfig,
382+
executor: task_executor::TaskExecutor,
383+
gossipsub_registry: Option<&'_ mut Registry>,
384+
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
385+
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
386+
) -> error::Result<(Arc<NetworkGlobals<T::EthSpec>>, NetworkSenders<T::EthSpec>)> {
387+
let (network_service, network_globals, network_senders) = Self::build(
388+
beacon_chain,
389+
config,
390+
executor.clone(),
391+
gossipsub_registry,
392+
beacon_processor_send,
393+
beacon_processor_reprocess_tx,
394+
)
395+
.await?;
396+
372397
network_service.spawn_service(executor);
373398

374399
Ok((network_globals, network_senders))
@@ -882,9 +907,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
882907

883908
fn update_next_fork(&mut self) {
884909
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
910+
let new_fork_digest = new_enr_fork_id.fork_digest;
885911

886912
let fork_context = &self.fork_context;
887-
if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) {
913+
if let Some(new_fork_name) = fork_context.from_context_bytes(new_fork_digest) {
888914
info!(
889915
self.log,
890916
"Transitioned to new fork";
@@ -907,6 +933,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
907933
Box::pin(next_fork_subscriptions_delay(&self.beacon_chain).into());
908934
self.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into());
909935
info!(self.log, "Network will unsubscribe from old fork gossip topics in a few epochs"; "remaining_epochs" => UNSUBSCRIBE_DELAY_EPOCHS);
936+
937+
// Remove topic weight from old fork topics to prevent peers that left on the mesh on
938+
// old topics from being penalized for not sending us messages.
939+
self.libp2p.remove_topic_weight_except(new_fork_digest);
910940
} else {
911941
crit!(self.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id);
912942
}

beacon_node/network/src/service/tests.rs

Lines changed: 137 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,26 @@ mod tests {
44
use crate::persisted_dht::load_dht;
55
use crate::{NetworkConfig, NetworkService};
66
use beacon_chain::test_utils::BeaconChainHarness;
7-
use beacon_processor::BeaconProcessorChannels;
8-
use lighthouse_network::Enr;
7+
use beacon_chain::BeaconChainTypes;
8+
use beacon_processor::{BeaconProcessorChannels, BeaconProcessorConfig};
9+
use futures::StreamExt;
10+
use lighthouse_network::types::{GossipEncoding, GossipKind};
11+
use lighthouse_network::{Enr, GossipTopic};
912
use slog::{o, Drain, Level, Logger};
1013
use sloggers::{null::NullLoggerBuilder, Build};
1114
use std::str::FromStr;
1215
use std::sync::Arc;
1316
use tokio::runtime::Runtime;
14-
use types::MinimalEthSpec;
17+
use types::{Epoch, EthSpec, ForkName, MinimalEthSpec, SubnetId};
18+
19+
impl<T: BeaconChainTypes> NetworkService<T> {
20+
fn get_topic_params(
21+
&self,
22+
topic: GossipTopic,
23+
) -> Option<&lighthouse_network::libp2p::gossipsub::TopicScoreParams> {
24+
self.libp2p.get_topic_params(topic)
25+
}
26+
}
1527

1628
fn get_logger(actual_log: bool) -> Logger {
1729
if actual_log {
@@ -102,4 +114,126 @@ mod tests {
102114
"should have persisted the second ENR to store"
103115
);
104116
}
117+
118+
// Test removing topic weight on old topics when a fork happens.
119+
#[test]
120+
fn test_removing_topic_weight_on_old_topics() {
121+
let runtime = Arc::new(Runtime::new().unwrap());
122+
123+
// Capella spec
124+
let mut spec = MinimalEthSpec::default_spec();
125+
spec.altair_fork_epoch = Some(Epoch::new(0));
126+
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
127+
spec.capella_fork_epoch = Some(Epoch::new(1));
128+
129+
// Build beacon chain.
130+
let beacon_chain = BeaconChainHarness::builder(MinimalEthSpec)
131+
.spec(spec.clone())
132+
.deterministic_keypairs(8)
133+
.fresh_ephemeral_store()
134+
.mock_execution_layer()
135+
.build()
136+
.chain;
137+
let (next_fork_name, _) = beacon_chain.duration_to_next_fork().expect("next fork");
138+
assert_eq!(next_fork_name, ForkName::Capella);
139+
140+
// Build network service.
141+
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
142+
let (_, exit) = exit_future::signal();
143+
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
144+
let executor = task_executor::TaskExecutor::new(
145+
Arc::downgrade(&runtime),
146+
exit,
147+
get_logger(false),
148+
shutdown_tx,
149+
);
150+
151+
let mut config = NetworkConfig::default();
152+
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21214, 21214, 21215);
153+
config.discv5_config.table_filter = |_| true; // Do not ignore local IPs
154+
config.upnp_enabled = false;
155+
156+
let beacon_processor_channels =
157+
BeaconProcessorChannels::new(&BeaconProcessorConfig::default());
158+
NetworkService::build(
159+
beacon_chain.clone(),
160+
&config,
161+
executor.clone(),
162+
None,
163+
beacon_processor_channels.beacon_processor_tx,
164+
beacon_processor_channels.work_reprocessing_tx,
165+
)
166+
.await
167+
.unwrap()
168+
});
169+
170+
// Subscribe to the topics.
171+
runtime.block_on(async {
172+
while network_globals.gossipsub_subscriptions.read().len() < 2 {
173+
if let Some(msg) = network_service.attestation_service.next().await {
174+
network_service.on_attestation_service_msg(msg);
175+
}
176+
}
177+
});
178+
179+
// Make sure the service is subscribed to the topics.
180+
let (old_topic1, old_topic2) = {
181+
let mut subnets = SubnetId::compute_subnets_for_epoch::<MinimalEthSpec>(
182+
network_globals.local_enr().node_id().raw().into(),
183+
beacon_chain.epoch().unwrap(),
184+
&spec,
185+
)
186+
.unwrap()
187+
.0
188+
.collect::<Vec<_>>();
189+
assert_eq!(2, subnets.len());
190+
191+
let old_fork_digest = beacon_chain.enr_fork_id().fork_digest;
192+
let old_topic1 = GossipTopic::new(
193+
GossipKind::Attestation(subnets.pop().unwrap()),
194+
GossipEncoding::SSZSnappy,
195+
old_fork_digest,
196+
);
197+
let old_topic2 = GossipTopic::new(
198+
GossipKind::Attestation(subnets.pop().unwrap()),
199+
GossipEncoding::SSZSnappy,
200+
old_fork_digest,
201+
);
202+
203+
(old_topic1, old_topic2)
204+
};
205+
let subscriptions = network_globals.gossipsub_subscriptions.read().clone();
206+
assert_eq!(2, subscriptions.len());
207+
assert!(subscriptions.contains(&old_topic1));
208+
assert!(subscriptions.contains(&old_topic2));
209+
let old_topic_params1 = network_service
210+
.get_topic_params(old_topic1.clone())
211+
.expect("topic score params");
212+
assert!(old_topic_params1.topic_weight > 0.0);
213+
let old_topic_params2 = network_service
214+
.get_topic_params(old_topic2.clone())
215+
.expect("topic score params");
216+
assert!(old_topic_params2.topic_weight > 0.0);
217+
218+
// Advance slot to the next fork
219+
for _ in 0..MinimalEthSpec::slots_per_epoch() {
220+
beacon_chain.slot_clock.advance_slot();
221+
}
222+
223+
// Run `NetworkService::update_next_fork()`.
224+
runtime.block_on(async {
225+
network_service.update_next_fork();
226+
});
227+
228+
// Check that topic_weight on the old topics has been zeroed.
229+
let old_topic_params1 = network_service
230+
.get_topic_params(old_topic1)
231+
.expect("topic score params");
232+
assert_eq!(0.0, old_topic_params1.topic_weight);
233+
234+
let old_topic_params2 = network_service
235+
.get_topic_params(old_topic2)
236+
.expect("topic score params");
237+
assert_eq!(0.0, old_topic_params2.topic_weight);
238+
}
105239
}

0 commit comments

Comments
 (0)