Skip to content

Commit e580278

Browse files
committed
Cleaner logic for gossip subscriptions for new forks (#4030)
## Issue Addressed Cleaner resolution for #4006 ## Proposed Changes We are currently subscribing to core topics of new forks way before the actual fork since we had just a single `CORE_TOPICS` array. This PR separates the core topics for every fork and subscribes to only required topics based on the current fork. Also adds logic for subscribing to the core topics of a new fork only 2 slots before the fork happens. 2 slots is to give enough time for the gossip meshes to form. Currently doesn't add logic to remove topics from older forks in new forks. For e.g. in the coupled 4844 world, we had to remove the `BeaconBlock` topic in favour of `BeaconBlocksAndBlobsSidecar` at the 4844 fork. It should be easy enough to add though. Not adding it because I'm assuming that #4019 will get merged before this PR and we won't require any deletion logic. Happy to add it regardless though.
1 parent 047c754 commit e580278

File tree

4 files changed

+60
-13
lines changed

4 files changed

+60
-13
lines changed

beacon_node/lighthouse_network/src/service/mod.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use crate::rpc::*;
1313
use crate::service::behaviour::BehaviourEvent;
1414
pub use crate::service::behaviour::Gossipsub;
1515
use crate::types::{
16-
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet,
17-
SubnetDiscovery,
16+
fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic,
17+
SnappyTransform, Subnet, SubnetDiscovery,
1818
};
1919
use crate::EnrExt;
2020
use crate::Eth2Enr;
@@ -41,6 +41,7 @@ use std::{
4141
sync::Arc,
4242
task::{Context, Poll},
4343
};
44+
use types::ForkName;
4445
use types::{
4546
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
4647
};
@@ -559,13 +560,20 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
559560
self.unsubscribe(gossip_topic)
560561
}
561562

562-
/// Subscribe to all currently subscribed topics with the new fork digest.
563-
pub fn subscribe_new_fork_topics(&mut self, new_fork_digest: [u8; 4]) {
563+
/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
564+
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
565+
// Subscribe to existing topics with new fork digest
564566
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
565567
for mut topic in subscriptions.into_iter() {
566568
topic.fork_digest = new_fork_digest;
567569
self.subscribe(topic);
568570
}
571+
572+
// Subscribe to core topics for the new fork
573+
for kind in fork_core_topics(&new_fork) {
574+
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
575+
self.subscribe(topic);
576+
}
569577
}
570578

571579
/// Unsubscribe from all topics that doesn't have the given fork_digest

beacon_node/lighthouse_network/src/types/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
1717
pub use subnet::{Subnet, SubnetDiscovery};
1818
pub use sync_state::{BackFillState, SyncState};
1919
pub use topics::{
20-
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS,
21-
LIGHT_CLIENT_GOSSIP_TOPICS,
20+
core_topics_to_subscribe, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind,
21+
GossipTopic, LIGHT_CLIENT_GOSSIP_TOPICS,
2222
};

beacon_node/lighthouse_network/src/types/topics.rs

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use libp2p::gossipsub::{IdentTopic as Topic, TopicHash};
22
use serde_derive::{Deserialize, Serialize};
33
use strum::AsRefStr;
4-
use types::{SubnetId, SyncSubnetId};
4+
use types::{ForkName, SubnetId, SyncSubnetId};
55

66
use crate::Subnet;
77

@@ -22,21 +22,48 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change";
2222
pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
2323
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
2424

25-
pub const CORE_TOPICS: [GossipKind; 7] = [
25+
pub const BASE_CORE_TOPICS: [GossipKind; 5] = [
2626
GossipKind::BeaconBlock,
2727
GossipKind::BeaconAggregateAndProof,
2828
GossipKind::VoluntaryExit,
2929
GossipKind::ProposerSlashing,
3030
GossipKind::AttesterSlashing,
31-
GossipKind::SignedContributionAndProof,
32-
GossipKind::BlsToExecutionChange,
3331
];
3432

33+
pub const ALTAIR_CORE_TOPICS: [GossipKind; 1] = [GossipKind::SignedContributionAndProof];
34+
35+
pub const CAPELLA_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BlsToExecutionChange];
36+
37+
pub const EIP4844_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BeaconBlocksAndBlobsSidecar];
38+
3539
pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
3640
GossipKind::LightClientFinalityUpdate,
3741
GossipKind::LightClientOptimisticUpdate,
3842
];
3943

44+
/// Returns the core topics associated with each fork that are new to the previous fork
45+
pub fn fork_core_topics(fork_name: &ForkName) -> Vec<GossipKind> {
46+
match fork_name {
47+
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
48+
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
49+
ForkName::Merge => vec![],
50+
ForkName::Capella => CAPELLA_CORE_TOPICS.to_vec(),
51+
ForkName::Eip4844 => EIP4844_CORE_TOPICS.to_vec(),
52+
}
53+
}
54+
55+
/// Returns all the topics that we need to subscribe to for a given fork
56+
/// including topics from older forks and new topics for the current fork.
57+
pub fn core_topics_to_subscribe(mut current_fork: ForkName) -> Vec<GossipKind> {
58+
let mut topics = fork_core_topics(&current_fork);
59+
while let Some(previous_fork) = current_fork.previous_fork() {
60+
let previous_fork_topics = fork_core_topics(&previous_fork);
61+
topics.extend(previous_fork_topics);
62+
current_fork = previous_fork;
63+
}
64+
topics
65+
}
66+
4067
/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
4168
/// the pubsub protocol and the way the messages should be encoded.
4269
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
@@ -390,4 +417,16 @@ mod tests {
390417
assert_eq!("proposer_slashing", ProposerSlashing.as_ref());
391418
assert_eq!("attester_slashing", AttesterSlashing.as_ref());
392419
}
420+
421+
#[test]
422+
fn test_core_topics_to_subscribe() {
423+
let mut all_topics = Vec::new();
424+
all_topics.extend(EIP4844_CORE_TOPICS);
425+
all_topics.extend(CAPELLA_CORE_TOPICS);
426+
all_topics.extend(ALTAIR_CORE_TOPICS);
427+
all_topics.extend(BASE_CORE_TOPICS);
428+
429+
let latest_fork = *ForkName::list_all().last().unwrap();
430+
assert_eq!(core_topics_to_subscribe(latest_fork), all_topics);
431+
}
393432
}

beacon_node/network/src/service.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use lighthouse_network::{
1919
Context, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet,
2020
};
2121
use lighthouse_network::{
22-
types::{GossipEncoding, GossipTopic},
22+
types::{core_topics_to_subscribe, GossipEncoding, GossipTopic},
2323
MessageId, NetworkEvent, NetworkGlobals, PeerId,
2424
};
2525
use slog::{crit, debug, error, info, o, trace, warn};
@@ -445,7 +445,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
445445
let fork_version = self.beacon_chain.spec.fork_version_for_name(fork_name);
446446
let fork_digest = ChainSpec::compute_fork_digest(fork_version, self.beacon_chain.genesis_validators_root);
447447
info!(self.log, "Subscribing to new fork topics");
448-
self.libp2p.subscribe_new_fork_topics(fork_digest);
448+
self.libp2p.subscribe_new_fork_topics(fork_name, fork_digest);
449449
self.next_fork_subscriptions = Box::pin(None.into());
450450
}
451451
else {
@@ -684,7 +684,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
684684
}
685685

686686
let mut subscribed_topics: Vec<GossipTopic> = vec![];
687-
for topic_kind in lighthouse_network::types::CORE_TOPICS.iter() {
687+
for topic_kind in core_topics_to_subscribe(self.fork_context.current_fork()) {
688688
for fork_digest in self.required_gossip_fork_digests() {
689689
let topic = GossipTopic::new(
690690
topic_kind.clone(),

0 commit comments

Comments
 (0)