Skip to content

Commit 6cba4f6

Browse files
Throttle outgoing requests by both peer and protocol id (#8969)
1 parent c0a17cf commit 6cba4f6

File tree

7 files changed

+128
-65
lines changed

7 files changed

+128
-65
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
- Remove delay when fetching blobs from the local EL on block arrival
1414

1515
### Bug Fixes
16-
- Fix `--version` command output [#8960](https://github.com/Consensys/teku/issues/8960)
16+
- Fix `--version` command output [#8960](https://github.com/Consensys/teku/issues/8960)
17+
- Fix issue (introduced in `24.12.1`) with peer stability when the upperbound is set to a high number

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/methods/Eth2RpcMethod.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,5 @@ public interface Eth2RpcMethod<TRequest extends RpcRequest & SszData, TResponse
4444

4545
@Override
4646
Eth2OutgoingRequestHandler<TRequest, TResponse> createOutgoingRequestHandler(
47-
String protocolId,
48-
final TRequest request,
49-
Eth2RpcResponseHandler<TResponse, ?> responseHandler);
47+
String protocolId, TRequest request, Eth2RpcResponseHandler<TResponse, ?> responseHandler);
5048
}

networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.REMOTE_OPEN_STREAMS_RATE_LIMIT;
1717
import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.REMOTE_PARALLEL_OPEN_STREAMS_COUNT_LIMIT;
18-
import static tech.pegasys.teku.spec.constants.NetworkConstants.MAX_CONCURRENT_REQUESTS;
1918

2019
import com.google.common.base.Preconditions;
2120
import identify.pb.IdentifyOuterClass;
@@ -153,9 +152,7 @@ public P2PNetwork<Peer> build() {
153152
}
154153

155154
protected List<? extends RpcHandler<?, ?, ?>> createRpcHandlers() {
156-
return rpcMethods.stream()
157-
.map(m -> new RpcHandler<>(asyncRunner, m, MAX_CONCURRENT_REQUESTS))
158-
.toList();
155+
return rpcMethods.stream().map(m -> new RpcHandler<>(asyncRunner, m)).toList();
159156
}
160157

161158
protected LibP2PGossipNetwork createGossipNetwork() {

networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PPeer.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.libp2p.core.PeerId;
1919
import io.libp2p.core.crypto.PubKey;
2020
import io.libp2p.protocol.Identify;
21+
import io.libp2p.protocol.IdentifyController;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.Optional;
@@ -28,6 +29,7 @@
2829
import org.apache.logging.log4j.LogManager;
2930
import org.apache.logging.log4j.Logger;
3031
import tech.pegasys.teku.infrastructure.async.SafeFuture;
32+
import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue;
3133
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
3234
import tech.pegasys.teku.networking.p2p.network.PeerAddress;
3335
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
@@ -41,11 +43,12 @@
4143
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
4244
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
4345
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
46+
import tech.pegasys.teku.spec.constants.NetworkConstants;
4447

4548
public class LibP2PPeer implements Peer {
4649
private static final Logger LOG = LogManager.getLogger();
4750

48-
private final Map<RpcMethod<?, ?, ?>, RpcHandler<?, ?, ?>> rpcHandlers;
51+
private final Map<RpcMethod<?, ?, ?>, ThrottlingRpcHandler<?, ?, ?>> rpcHandlers;
4952
private final ReputationManager reputationManager;
5053
private final Function<PeerId, Double> peerScoreFunction;
5154
private final Connection connection;
@@ -71,7 +74,8 @@ public LibP2PPeer(
7174
final Function<PeerId, Double> peerScoreFunction) {
7275
this.connection = connection;
7376
this.rpcHandlers =
74-
rpcHandlers.stream().collect(Collectors.toMap(RpcHandler::getRpcMethod, h -> h));
77+
rpcHandlers.stream()
78+
.collect(Collectors.toMap(RpcHandler::getRpcMethod, ThrottlingRpcHandler::new));
7579
this.reputationManager = reputationManager;
7680
this.peerScoreFunction = peerScoreFunction;
7781
this.peerId = connection.secureSession().getRemoteId();
@@ -109,10 +113,6 @@ private PeerClientType getPeerTypeFromAgentString(final String agentVersion) {
109113
return EnumUtils.getEnumIgnoreCase(PeerClientType.class, agent, PeerClientType.UNKNOWN);
110114
}
111115

112-
public Optional<String> getMaybeAgentString() {
113-
return maybeAgentString;
114-
}
115-
116116
public PubKey getPubKey() {
117117
return pubKey;
118118
}
@@ -161,7 +161,7 @@ private SafeFuture<IdentifyOuterClass.Identify> getIdentify() {
161161
.muxerSession()
162162
.createStream(new Identify())
163163
.getController()
164-
.thenCompose(controller -> controller.id()))
164+
.thenCompose(IdentifyController::id))
165165
.exceptionallyCompose(
166166
error -> {
167167
LOG.debug("Failed to get peer identity", error);
@@ -208,8 +208,8 @@ SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
208208
final TRequest request,
209209
final RespHandler responseHandler) {
210210
@SuppressWarnings("unchecked")
211-
RpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
212-
(RpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
211+
final ThrottlingRpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
212+
(ThrottlingRpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
213213
if (rpcHandler == null) {
214214
throw new IllegalArgumentException(
215215
"Unknown rpc method invoked: " + String.join(",", rpcMethod.getIds()));
@@ -240,4 +240,26 @@ public void adjustReputation(final ReputationAdjustment adjustment) {
240240
disconnectCleanly(DisconnectReason.REMOTE_FAULT).ifExceptionGetsHereRaiseABug();
241241
}
242242
}
243+
244+
private static class ThrottlingRpcHandler<
245+
TOutgoingHandler extends RpcRequestHandler,
246+
TRequest,
247+
TRespHandler extends RpcResponseHandler<?>> {
248+
249+
private final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate;
250+
251+
private final ThrottlingTaskQueue requestsQueue =
252+
ThrottlingTaskQueue.create(NetworkConstants.MAX_CONCURRENT_REQUESTS);
253+
254+
private ThrottlingRpcHandler(
255+
final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate) {
256+
this.delegate = delegate;
257+
}
258+
259+
private SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
260+
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
261+
return requestsQueue.queueTask(
262+
() -> delegate.sendRequest(connection, request, responseHandler));
263+
}
264+
}
243265
}

networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandler.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
3939
import tech.pegasys.teku.infrastructure.async.SafeFuture;
4040
import tech.pegasys.teku.infrastructure.async.SafeFuture.Interruptor;
41-
import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue;
4241
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
4342
import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNodeId;
4443
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler.Controller;
@@ -63,15 +62,12 @@ public class RpcHandler<
6362

6463
private final AsyncRunner asyncRunner;
6564
private final RpcMethod<TOutgoingHandler, TRequest, TRespHandler> rpcMethod;
66-
private final ThrottlingTaskQueue concurrentRequestsQueue;
6765

6866
public RpcHandler(
6967
final AsyncRunner asyncRunner,
70-
final RpcMethod<TOutgoingHandler, TRequest, TRespHandler> rpcMethod,
71-
final int maxConcurrentRequests) {
68+
final RpcMethod<TOutgoingHandler, TRequest, TRespHandler> rpcMethod) {
7269
this.asyncRunner = asyncRunner;
7370
this.rpcMethod = rpcMethod;
74-
concurrentRequestsQueue = ThrottlingTaskQueue.create(maxConcurrentRequests);
7571
}
7672

7773
public RpcMethod<TOutgoingHandler, TRequest, TRespHandler> getRpcMethod() {
@@ -80,13 +76,6 @@ public RpcMethod<TOutgoingHandler, TRequest, TRespHandler> getRpcMethod() {
8076

8177
public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
8278
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
83-
return concurrentRequestsQueue.queueTask(
84-
() -> sendRequestInternal(connection, request, responseHandler));
85-
}
86-
87-
public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequestInternal(
88-
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
89-
9079
final Bytes initialPayload;
9180
try {
9281
initialPayload = rpcMethod.encodeRequest(request);
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Consensys Software Inc., 2025
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package tech.pegasys.teku.networking.p2p.libp2p;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
import static org.mockito.Mockito.mock;
18+
import static org.mockito.Mockito.when;
19+
20+
import io.libp2p.core.Connection;
21+
import io.libp2p.core.security.SecureChannel.Session;
22+
import java.util.List;
23+
import java.util.stream.IntStream;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
import tech.pegasys.teku.infrastructure.async.SafeFuture;
27+
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
28+
import tech.pegasys.teku.networking.p2p.reputation.ReputationManager;
29+
import tech.pegasys.teku.networking.p2p.rpc.RpcMethod;
30+
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
31+
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
32+
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
33+
import tech.pegasys.teku.spec.constants.NetworkConstants;
34+
35+
public class LibP2PPeerTest {
36+
37+
private final Connection connection = mock(Connection.class);
38+
39+
@SuppressWarnings("unchecked")
40+
private final RpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcHandler =
41+
mock(RpcHandler.class);
42+
43+
@SuppressWarnings("unchecked")
44+
private final RpcMethod<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcMethod =
45+
mock(RpcMethod.class);
46+
47+
private LibP2PPeer libP2PPeer;
48+
49+
@BeforeEach
50+
public void init() {
51+
when(rpcHandler.getRpcMethod()).thenReturn(rpcMethod);
52+
final Session secureSession = mock(Session.class);
53+
when(connection.secureSession()).thenReturn(secureSession);
54+
when(connection.closeFuture()).thenReturn(new SafeFuture<>());
55+
libP2PPeer =
56+
new LibP2PPeer(connection, List.of(rpcHandler), ReputationManager.NOOP, peer -> 0.0);
57+
}
58+
59+
@SuppressWarnings({"unchecked", "FutureReturnValueIgnored"})
60+
@Test
61+
public void sendRequest_throttlesRequests() {
62+
63+
// fill the queue with incomplete futures
64+
final List<SafeFuture<RpcStreamController<RpcRequestHandler>>> queuedFutures =
65+
IntStream.range(0, NetworkConstants.MAX_CONCURRENT_REQUESTS)
66+
.mapToObj(
67+
__ -> {
68+
final SafeFuture<RpcStreamController<RpcRequestHandler>> future =
69+
new SafeFuture<>();
70+
when(rpcHandler.sendRequest(connection, null, null)).thenReturn(future);
71+
libP2PPeer.sendRequest(rpcMethod, null, null);
72+
return future;
73+
})
74+
.toList();
75+
76+
when(rpcHandler.sendRequest(connection, null, null))
77+
.thenReturn(SafeFuture.completedFuture(mock(RpcStreamController.class)));
78+
79+
final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledRequest =
80+
libP2PPeer.sendRequest(rpcMethod, null, null);
81+
82+
// completed request should be throttled
83+
assertThat(throttledRequest).isNotDone();
84+
85+
// empty the queue
86+
queuedFutures.forEach(future -> future.complete(mock(RpcStreamController.class)));
87+
88+
// throttled request should have completed now
89+
assertThat(throttledRequest).isDone();
90+
}
91+
}

networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/RpcHandlerTest.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import io.libp2p.core.mux.StreamMuxer.Session;
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.atomic.AtomicReference;
33-
import java.util.stream.IntStream;
3433
import kotlin.Unit;
3534
import org.junit.jupiter.api.BeforeEach;
3635
import org.junit.jupiter.api.Test;
@@ -53,9 +52,8 @@ public class RpcHandlerTest {
5352

5453
StubAsyncRunner asyncRunner = new StubAsyncRunner();
5554
RpcMethod<RpcRequestHandler, Object, RpcResponseHandler<?>> rpcMethod = mock(RpcMethod.class);
56-
int maxConcurrentRequests = 2;
5755
RpcHandler<RpcRequestHandler, Object, RpcResponseHandler<?>> rpcHandler =
58-
new RpcHandler<>(asyncRunner, rpcMethod, maxConcurrentRequests);
56+
new RpcHandler<>(asyncRunner, rpcMethod);
5957

6058
Connection connection = mock(Connection.class);
6159
Session session = mock(Session.class);
@@ -249,39 +247,6 @@ void sendRequest_interruptBeforeInitialPayloadWritten(
249247
verify(stream).close();
250248
}
251249

252-
@Test
253-
@SuppressWarnings("FutureReturnValueIgnored")
254-
void requestIsThrottledIfQueueIsFull() {
255-
// fill the queue
256-
IntStream.range(0, maxConcurrentRequests)
257-
.forEach(__ -> rpcHandler.sendRequest(connection, request, responseHandler));
258-
259-
final StreamPromise<Controller<RpcRequestHandler>> streamPromise1 =
260-
new StreamPromise<>(new CompletableFuture<>(), new CompletableFuture<>());
261-
when(session.createStream((ProtocolBinding<Controller<RpcRequestHandler>>) any()))
262-
.thenReturn(streamPromise1);
263-
final Stream stream1 = mock(Stream.class);
264-
streamPromise1.getStream().complete(stream1);
265-
streamPromise1.getController().complete(controller);
266-
final CompletableFuture<String> protocolIdFuture1 = new CompletableFuture<>();
267-
when(stream1.getProtocol()).thenReturn(protocolIdFuture1);
268-
protocolIdFuture1.complete("test");
269-
270-
final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledResult =
271-
rpcHandler.sendRequest(connection, request, responseHandler);
272-
273-
assertThat(throttledResult).isNotDone();
274-
275-
// empty the queue
276-
streamPromise.getStream().complete(stream);
277-
streamPromise.getController().complete(controller);
278-
stream.getProtocol().complete("test");
279-
writeFuture.complete(null);
280-
281-
// throttled request should have completed now
282-
assertThat(throttledResult).isCompleted();
283-
}
284-
285250
@SuppressWarnings("UnnecessaryAsync")
286251
private Class<? extends Exception> executeInterrupts(
287252
final boolean closeStream, final boolean exceedTimeout) {

0 commit comments

Comments
 (0)