Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions beacon_node/network/src/subnet_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub struct SubnetService<T: BeaconChainTypes> {
subscriptions: HashSetDelay<Subnet>,

/// Subscriptions that need to be executed in the future.
scheduled_subscriptions: HashSetDelay<Subnet>,
scheduled_subscriptions: HashSetDelay<ExactSubnet>,

/// A list of permanent subnets that this node is subscribed to.
// TODO: Shift this to a dynamic bitfield
Expand Down Expand Up @@ -484,8 +484,10 @@ impl<T: BeaconChainTypes> SubnetService<T> {
self.subscribe_to_subnet_immediately(subnet, slot + 1)?;
} else {
// This is a future slot, schedule subscribing.
// We need to include the slot to make the key unique to prevent overwriting the entry
// for the same subnet.
self.scheduled_subscriptions
.insert_at(subnet, time_to_subscription_start);
.insert_at(ExactSubnet { subnet, slot }, time_to_subscription_start);
}

Ok(())
Expand Down Expand Up @@ -626,7 +628,8 @@ impl<T: BeaconChainTypes> Stream for SubnetService<T> {
// Process scheduled subscriptions that might be ready, since those can extend a soon to
// expire subscription.
match self.scheduled_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(subnet))) => {
Poll::Ready(Some(Ok(exact_subnet))) => {
let ExactSubnet { subnet, .. } = exact_subnet;
let current_slot = self.beacon_chain.slot_clock.now().unwrap_or_default();
if let Err(e) = self.subscribe_to_subnet_immediately(subnet, current_slot + 1) {
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet, "err" => e);
Expand Down
55 changes: 51 additions & 4 deletions beacon_node/network/src/subnet_service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,12 +500,15 @@ mod test {
// subscription config
let committee_count = 1;

// Makes 2 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
// Makes 3 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for each of the later slots subscriptions
// (subscription_slot2 and subscription_slot3).
let subscription_slot1 = 0;
let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
let subscription_slot3 = subscription_slot2 * 2;
let com1 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
let com2 = 0;
let com3 = CHAIN.chain.spec.attestation_subnet_count - com1;

// create the attestation service and subscriptions
let mut subnet_service = get_subnet_service();
Expand All @@ -532,6 +535,13 @@ mod test {
true,
);

let sub3 = get_subscription(
com3,
current_slot + Slot::new(subscription_slot3),
committee_count,
true,
);

let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot + Slot::new(subscription_slot1),
com1,
Expand All @@ -548,12 +558,23 @@ mod test {
)
.unwrap();

let subnet_id3 = SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot + Slot::new(subscription_slot3),
com3,
committee_count,
&subnet_service.beacon_chain.spec,
)
.unwrap();

// Assert that subscriptions are different but their subnet is the same
assert_ne!(sub1, sub2);
assert_ne!(sub1, sub3);
assert_ne!(sub2, sub3);
assert_eq!(subnet_id1, subnet_id2);
assert_eq!(subnet_id1, subnet_id3);

// submit the subscriptions
subnet_service.validator_subscriptions(vec![sub1, sub2].into_iter());
subnet_service.validator_subscriptions(vec![sub1, sub2, sub3].into_iter());

// Unsubscription event should happen at the end of the slot.
// We wait for 2 slots, to avoid timeout issues
Expand Down Expand Up @@ -590,10 +611,36 @@ mod test {
// If the permanent and short lived subnets are different, we should get an unsubscription event.
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
assert_eq!(
[expected_subscription, expected_unsubscription],
[
expected_subscription.clone(),
expected_unsubscription.clone(),
],
second_subscribe_event[..]
);
}

let subscription_slot = current_slot + subscription_slot3 - 1;

let wait_slots = subnet_service
.beacon_chain
.slot_clock
.duration_to_slot(subscription_slot)
.unwrap()
.as_millis() as u64
/ SLOT_DURATION_MILLIS;

let no_events = dbg!(get_events(&mut subnet_service, None, wait_slots as u32).await);

assert_eq!(no_events, []);

let third_subscribe_event = get_events(&mut subnet_service, None, 2).await;

if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
assert_eq!(
[expected_subscription, expected_unsubscription],
third_subscribe_event[..]
);
}
}

#[tokio::test]
Expand Down
Loading