@@ -147,7 +147,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
147
147
Err ( ( rate_limited_req, wait_time) ) => {
148
148
let key = ( peer_id, protocol) ;
149
149
self . next_peer_request . insert ( key, wait_time) ;
150
- queued_requests. push_back ( rate_limited_req) ;
150
+ queued_requests. push_front ( rate_limited_req) ;
151
151
// If one fails just wait for the next window that allows sending requests.
152
152
return ;
153
153
}
@@ -205,3 +205,72 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
205
205
Poll :: Pending
206
206
}
207
207
}
208
+
209
+ #[ cfg( test) ]
210
+ mod tests {
211
+ use crate :: rpc:: config:: { OutboundRateLimiterConfig , RateLimiterConfig } ;
212
+ use crate :: rpc:: rate_limiter:: Quota ;
213
+ use crate :: rpc:: self_limiter:: SelfRateLimiter ;
214
+ use crate :: rpc:: { OutboundRequest , Ping , Protocol } ;
215
+ use crate :: service:: api_types:: RequestId ;
216
+ use libp2p:: PeerId ;
217
+ use std:: time:: Duration ;
218
+ use types:: MainnetEthSpec ;
219
+
220
+ /// Test that `next_peer_request_ready` correctly maintains the queue.
221
+ #[ tokio:: test]
222
+ async fn test_next_peer_request_ready ( ) {
223
+ let log = logging:: test_logger ( ) ;
224
+ let config = OutboundRateLimiterConfig ( RateLimiterConfig {
225
+ ping_quota : Quota :: n_every ( 1 , 2 ) ,
226
+ ..Default :: default ( )
227
+ } ) ;
228
+ let mut limiter: SelfRateLimiter < RequestId < u64 > , MainnetEthSpec > =
229
+ SelfRateLimiter :: new ( config, log) . unwrap ( ) ;
230
+ let peer_id = PeerId :: random ( ) ;
231
+
232
+ for i in 1 ..=5 {
233
+ let _ = limiter. allows (
234
+ peer_id,
235
+ RequestId :: Application ( i) ,
236
+ OutboundRequest :: Ping ( Ping { data : i } ) ,
237
+ ) ;
238
+ }
239
+
240
+ {
241
+ let queue = limiter
242
+ . delayed_requests
243
+ . get ( & ( peer_id, Protocol :: Ping ) )
244
+ . unwrap ( ) ;
245
+ assert_eq ! ( 4 , queue. len( ) ) ;
246
+
247
+ // Check that requests in the queue are ordered in the sequence 2, 3, 4, 5.
248
+ let mut iter = queue. iter ( ) ;
249
+ for i in 2 ..=5 {
250
+ assert_eq ! ( iter. next( ) . unwrap( ) . request_id, RequestId :: Application ( i) ) ;
251
+ }
252
+
253
+ assert_eq ! ( limiter. ready_requests. len( ) , 0 ) ;
254
+ }
255
+
256
+ // Wait until the tokens have been regenerated, then run `next_peer_request_ready`.
257
+ tokio:: time:: sleep ( Duration :: from_secs ( 3 ) ) . await ;
258
+ limiter. next_peer_request_ready ( peer_id, Protocol :: Ping ) ;
259
+
260
+ {
261
+ let queue = limiter
262
+ . delayed_requests
263
+ . get ( & ( peer_id, Protocol :: Ping ) )
264
+ . unwrap ( ) ;
265
+ assert_eq ! ( 3 , queue. len( ) ) ;
266
+
267
+ // Check that requests in the queue are ordered in the sequence 3, 4, 5.
268
+ let mut iter = queue. iter ( ) ;
269
+ for i in 3 ..=5 {
270
+ assert_eq ! ( iter. next( ) . unwrap( ) . request_id, RequestId :: Application ( i) ) ;
271
+ }
272
+
273
+ assert_eq ! ( limiter. ready_requests. len( ) , 1 ) ;
274
+ }
275
+ }
276
+ }
0 commit comments