Skip to content

Commit c0263ca

Browse files
authored
Don't subscribe in AttestationTopicSubscriber if subscription is outdated (#8837)
1 parent 0e86c08 commit c0263ca

File tree

2 files changed

+67
-4
lines changed

2 files changed

+67
-4
lines changed

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AttestationTopicSubscriber.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import it.unimi.dsi.fastutil.ints.IntSet;
2222
import java.util.Iterator;
2323
import java.util.Set;
24+
import java.util.concurrent.atomic.AtomicReference;
2425
import org.apache.logging.log4j.LogManager;
2526
import org.apache.logging.log4j.Logger;
2627
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
@@ -39,6 +40,7 @@ public class AttestationTopicSubscriber implements SlotEventsChannel {
3940
private final Eth2P2PNetwork eth2P2PNetwork;
4041
private final Spec spec;
4142
private final SettableLabelledGauge subnetSubscriptionsGauge;
43+
private final AtomicReference<UInt64> currentSlot = new AtomicReference<>(null);
4244

4345
public AttestationTopicSubscriber(
4446
final Spec spec,
@@ -56,6 +58,15 @@ public synchronized void subscribeToCommitteeForAggregation(
5658
aggregationSlot, UInt64.valueOf(committeeIndex), committeesAtSlot);
5759
final UInt64 currentUnsubscriptionSlot = subnetIdToUnsubscribeSlot.getOrDefault(subnetId, ZERO);
5860
final UInt64 unsubscribeSlot = currentUnsubscriptionSlot.max(aggregationSlot);
61+
final UInt64 maybeCurrentSlot = currentSlot.get();
62+
if (maybeCurrentSlot != null && unsubscribeSlot.isLessThan(maybeCurrentSlot)) {
63+
LOG.trace(
64+
"Skipping outdated aggregation subnet {} with unsubscribe due at slot {}",
65+
subnetId,
66+
unsubscribeSlot);
67+
return;
68+
}
69+
5970
if (currentUnsubscriptionSlot.equals(ZERO)) {
6071
eth2P2PNetwork.subscribeToAttestationSubnetId(subnetId);
6172
toggleAggregateSubscriptionMetric(subnetId, false);
@@ -96,15 +107,25 @@ public synchronized void subscribeToPersistentSubnets(
96107
boolean shouldUpdateENR = false;
97108

98109
for (SubnetSubscription subnetSubscription : newSubscriptions) {
99-
int subnetId = subnetSubscription.subnetId();
110+
final int subnetId = subnetSubscription.subnetId();
111+
final UInt64 maybeCurrentSlot = currentSlot.get();
112+
if (maybeCurrentSlot != null
113+
&& subnetSubscription.unsubscriptionSlot().isLessThan(maybeCurrentSlot)) {
114+
LOG.trace(
115+
"Skipping outdated persistent subnet {} with unsubscribe due at slot {}",
116+
subnetId,
117+
subnetSubscription.unsubscriptionSlot());
118+
continue;
119+
}
120+
100121
shouldUpdateENR = persistentSubnetIdSet.add(subnetId) || shouldUpdateENR;
101122
LOG.trace(
102123
"Subscribing to persistent subnet {} with unsubscribe due at slot {}",
103124
subnetId,
104125
subnetSubscription.unsubscriptionSlot());
105126
if (subnetIdToUnsubscribeSlot.containsKey(subnetId)) {
106-
UInt64 existingUnsubscriptionSlot = subnetIdToUnsubscribeSlot.get(subnetId);
107-
UInt64 unsubscriptionSlot =
127+
final UInt64 existingUnsubscriptionSlot = subnetIdToUnsubscribeSlot.get(subnetId);
128+
final UInt64 unsubscriptionSlot =
108129
existingUnsubscriptionSlot.max(subnetSubscription.unsubscriptionSlot());
109130
LOG.trace(
110131
"Already subscribed to subnet {}, updating unsubscription slot to {}",
@@ -127,14 +148,15 @@ public synchronized void subscribeToPersistentSubnets(
127148

128149
@Override
129150
public synchronized void onSlot(final UInt64 slot) {
151+
currentSlot.set(slot);
130152
boolean shouldUpdateENR = false;
131153

132154
final Iterator<Int2ObjectMap.Entry<UInt64>> iterator =
133155
subnetIdToUnsubscribeSlot.int2ObjectEntrySet().iterator();
134156
while (iterator.hasNext()) {
135157
final Int2ObjectMap.Entry<UInt64> entry = iterator.next();
136158
if (entry.getValue().compareTo(slot) < 0) {
137-
int subnetId = entry.getIntKey();
159+
final int subnetId = entry.getIntKey();
138160
LOG.trace("Unsubscribing from subnet {}", subnetId);
139161
eth2P2PNetwork.unsubscribeFromAttestationSubnetId(subnetId);
140162
if (persistentSubnetIdSet.contains(subnetId)) {

networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AttestationTopicSubscriberTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static org.assertj.core.api.Assertions.assertThat;
1717
import static org.mockito.ArgumentMatchers.anyInt;
18+
import static org.mockito.ArgumentMatchers.eq;
1819
import static org.mockito.Mockito.mock;
1920
import static org.mockito.Mockito.never;
2021
import static org.mockito.Mockito.times;
@@ -47,6 +48,7 @@ public void shouldSubscribeToSubnet() {
4748
final int committeeId = 10;
4849
final int subnetId =
4950
spec.computeSubnetForCommittee(ONE, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT);
51+
subscriber.onSlot(ONE);
5052
subscriber.subscribeToCommitteeForAggregation(committeeId, COMMITTEES_AT_SLOT, ONE);
5153

5254
verify(settableLabelledGaugeMock)
@@ -155,13 +157,52 @@ public void shouldPreserveLaterSubscriptionPeriodWhenEarlierSlotAdded() {
155157
verify(eth2P2PNetwork).unsubscribeFromAttestationSubnetId(subnetId);
156158
}
157159

160+
@Test
161+
public void shouldNotSubscribeForExpiredAggregationSubnet() {
162+
final int committeeId = 3;
163+
final UInt64 slot = UInt64.valueOf(10);
164+
final int subnetId =
165+
spec.computeSubnetForCommittee(slot, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT);
166+
// Sanity check second subscription is for the same subnet ID.
167+
assertThat(subnetId)
168+
.isEqualTo(
169+
spec.computeSubnetForCommittee(slot, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT));
170+
171+
subscriber.onSlot(slot.plus(ONE));
172+
subscriber.subscribeToCommitteeForAggregation(committeeId, COMMITTEES_AT_SLOT, slot);
173+
verifyNoMoreInteractions(settableLabelledGaugeMock);
174+
verify(eth2P2PNetwork, never()).subscribeToAttestationSubnetId(anyInt());
175+
verify(eth2P2PNetwork, never()).unsubscribeFromAttestationSubnetId(anyInt());
176+
}
177+
178+
@Test
179+
public void shouldNotSubscribeForExpiredPersistentSubnet() {
180+
Set<SubnetSubscription> subnetSubscriptions =
181+
Set.of(
182+
new SubnetSubscription(2, UInt64.valueOf(15)),
183+
new SubnetSubscription(1, UInt64.valueOf(20)));
184+
185+
subscriber.onSlot(UInt64.valueOf(16));
186+
subscriber.subscribeToPersistentSubnets(subnetSubscriptions);
187+
188+
verify(settableLabelledGaugeMock)
189+
.set(1, String.format(AttestationTopicSubscriber.GAUGE_PERSISTENT_SUBNETS_LABEL, 1));
190+
verifyNoMoreInteractions(settableLabelledGaugeMock);
191+
192+
verify(eth2P2PNetwork).setLongTermAttestationSubnetSubscriptions(IntSet.of(1));
193+
194+
verify(eth2P2PNetwork).subscribeToAttestationSubnetId(1);
195+
verify(eth2P2PNetwork, never()).subscribeToAttestationSubnetId(eq(2));
196+
}
197+
158198
@Test
159199
public void shouldSubscribeToNewSubnetsAndUpdateENR_forPersistentSubscriptions() {
160200
Set<SubnetSubscription> subnetSubscriptions =
161201
Set.of(
162202
new SubnetSubscription(1, UInt64.valueOf(20)),
163203
new SubnetSubscription(2, UInt64.valueOf(15)));
164204

205+
subscriber.onSlot(UInt64.valueOf(15));
165206
subscriber.subscribeToPersistentSubnets(subnetSubscriptions);
166207

167208
verify(settableLabelledGaugeMock)

0 commit comments

Comments
 (0)