Skip to content

Commit 5ca482f

Browse files
authored
Merge of #5554
2 parents 6bac5ce + 9d82adc commit 5ca482f

File tree

5 files changed

+63
-72
lines changed

5 files changed

+63
-72
lines changed

beacon_node/lighthouse_network/src/rpc/handler.rs

Lines changed: 40 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ struct InboundInfo<E: EthSpec> {
163163
/// Protocol of the original request we received from the peer.
164164
protocol: Protocol,
165165
/// Responses that the peer is still expecting from us.
166-
remaining_chunks: u64,
166+
max_remaining_chunks: u64,
167167
/// Useful to timing how long each request took to process. Currently only used by
168168
/// BlocksByRange.
169169
request_start_time: Instant,
@@ -180,7 +180,7 @@ struct OutboundInfo<Id, E: EthSpec> {
180180
/// Info over the protocol this substream is handling.
181181
proto: Protocol,
182182
/// Number of chunks to be seen from the peer's response.
183-
remaining_chunks: Option<u64>,
183+
max_remaining_chunks: Option<u64>,
184184
/// `Id` as given by the application that sent the request.
185185
req_id: Id,
186186
}
@@ -471,7 +471,7 @@ where
471471
// Process one more message if one exists.
472472
if let Some(message) = info.pending_items.pop_front() {
473473
// If this is the last chunk, terminate the stream.
474-
let last_chunk = info.remaining_chunks <= 1;
474+
let last_chunk = info.max_remaining_chunks <= 1;
475475
let fut =
476476
send_message_to_inbound_substream(substream, message, last_chunk)
477477
.boxed();
@@ -537,7 +537,8 @@ where
537537
{
538538
// The substream is still active, decrement the remaining
539539
// chunks expected.
540-
info.remaining_chunks = info.remaining_chunks.saturating_sub(1);
540+
info.max_remaining_chunks =
541+
info.max_remaining_chunks.saturating_sub(1);
541542

542543
// If this substream has not ended, we reset the timer.
543544
// Each chunk is allowed RESPONSE_TIMEOUT to be sent.
@@ -552,7 +553,7 @@ where
552553
// Process one more message if one exists.
553554
if let Some(message) = info.pending_items.pop_front() {
554555
// If this is the last chunk, terminate the stream.
555-
let last_chunk = info.remaining_chunks <= 1;
556+
let last_chunk = info.max_remaining_chunks <= 1;
556557
let fut = send_message_to_inbound_substream(
557558
substream, message, last_chunk,
558559
)
@@ -664,15 +665,19 @@ where
664665
request,
665666
} => match substream.poll_next_unpin(cx) {
666667
Poll::Ready(Some(Ok(response))) => {
667-
if request.expected_responses() > 1 && !response.close_after() {
668+
if request.expect_exactly_one_response() || response.close_after() {
669+
// either this is a single response request or this response closes the
670+
// stream
671+
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
672+
} else {
668673
let substream_entry = entry.get_mut();
669674
let delay_key = &substream_entry.delay_key;
670675
// chunks left after this one
671-
let remaining_chunks = substream_entry
672-
.remaining_chunks
676+
let max_remaining_chunks = substream_entry
677+
.max_remaining_chunks
673678
.map(|count| count.saturating_sub(1))
674679
.unwrap_or_else(|| 0);
675-
if remaining_chunks == 0 {
680+
if max_remaining_chunks == 0 {
676681
// this is the last expected message, close the stream as all expected chunks have been received
677682
substream_entry.state = OutboundSubstreamState::Closing(substream);
678683
} else {
@@ -682,14 +687,10 @@ where
682687
substream,
683688
request,
684689
};
685-
substream_entry.remaining_chunks = Some(remaining_chunks);
690+
substream_entry.max_remaining_chunks = Some(max_remaining_chunks);
686691
self.outbound_substreams_delay
687692
.reset(delay_key, self.resp_timeout);
688693
}
689-
} else {
690-
// either this is a single response request or this response closes the
691-
// stream
692-
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
693694
}
694695

695696
// Check what type of response we got and report it accordingly
@@ -725,7 +726,16 @@ where
725726
self.outbound_substreams_delay.remove(delay_key);
726727
entry.remove_entry();
727728
// notify the application error
728-
if request.expected_responses() > 1 {
729+
if request.expect_exactly_one_response() {
730+
// return an error, stream should not have closed early.
731+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
732+
HandlerEvent::Err(HandlerErr::Outbound {
733+
id: request_id,
734+
proto: request.versioned_protocol().protocol(),
735+
error: RPCError::IncompleteStream,
736+
}),
737+
));
738+
} else {
729739
// return an end of stream result
730740
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
731741
HandlerEvent::Ok(RPCReceived::EndOfStream(
@@ -734,16 +744,6 @@ where
734744
)),
735745
));
736746
}
737-
738-
// else we return an error, stream should not have closed early.
739-
let outbound_err = HandlerErr::Outbound {
740-
id: request_id,
741-
proto: request.versioned_protocol().protocol(),
742-
error: RPCError::IncompleteStream,
743-
};
744-
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
745-
HandlerEvent::Err(outbound_err),
746-
));
747747
}
748748
Poll::Pending => {
749749
entry.get_mut().state =
@@ -880,10 +880,10 @@ where
880880
}
881881

882882
let (req, substream) = substream;
883-
let expected_responses = req.expected_responses();
883+
let max_responses = req.max_responses();
884884

885885
// store requests that expect responses
886-
if expected_responses > 0 {
886+
if max_responses > 0 {
887887
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
888888
// Store the stream and tag the output.
889889
let delay_key = self
@@ -894,14 +894,13 @@ where
894894
self.current_inbound_substream_id,
895895
InboundInfo {
896896
state: awaiting_stream,
897-
pending_items: VecDeque::with_capacity(std::cmp::min(
898-
expected_responses,
899-
128,
900-
) as usize),
897+
pending_items: VecDeque::with_capacity(
898+
std::cmp::min(max_responses, 128) as usize
899+
),
901900
delay_key: Some(delay_key),
902901
protocol: req.versioned_protocol().protocol(),
903902
request_start_time: Instant::now(),
904-
remaining_chunks: expected_responses,
903+
max_remaining_chunks: max_responses,
905904
},
906905
);
907906
} else {
@@ -948,8 +947,14 @@ where
948947
}
949948

950949
// add the stream to substreams if we expect a response, otherwise drop the stream.
951-
let expected_responses = request.expected_responses();
952-
if expected_responses > 0 {
950+
let max_responses = request.max_responses();
951+
if max_responses > 0 {
952+
let max_remaining_chunks = if request.expect_exactly_one_response() {
953+
// Currently enforced only for multiple responses
954+
None
955+
} else {
956+
Some(max_responses)
957+
};
953958
// new outbound request. Store the stream and tag the output.
954959
let delay_key = self
955960
.outbound_substreams_delay
@@ -958,12 +963,6 @@ where
958963
substream: Box::new(substream),
959964
request,
960965
};
961-
let expected_responses = if expected_responses > 1 {
962-
// Currently enforced only for multiple responses
963-
Some(expected_responses)
964-
} else {
965-
None
966-
};
967966
if self
968967
.outbound_substreams
969968
.insert(
@@ -972,7 +971,7 @@ where
972971
state: awaiting_stream,
973972
delay_key,
974973
proto,
975-
remaining_chunks: expected_responses,
974+
max_remaining_chunks,
976975
req_id: id,
977976
},
978977
)

beacon_node/lighthouse_network/src/rpc/methods.rs

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -483,27 +483,6 @@ impl<E: EthSpec> RPCCodedResponse<E> {
483483
RPCCodedResponse::Error(code, err)
484484
}
485485

486-
/// Specifies which response allows for multiple chunks for the stream handler.
487-
pub fn multiple_responses(&self) -> bool {
488-
match self {
489-
RPCCodedResponse::Success(resp) => match resp {
490-
RPCResponse::Status(_) => false,
491-
RPCResponse::BlocksByRange(_) => true,
492-
RPCResponse::BlocksByRoot(_) => true,
493-
RPCResponse::BlobsByRange(_) => true,
494-
RPCResponse::BlobsByRoot(_) => true,
495-
RPCResponse::Pong(_) => false,
496-
RPCResponse::MetaData(_) => false,
497-
RPCResponse::LightClientBootstrap(_) => false,
498-
RPCResponse::LightClientOptimisticUpdate(_) => false,
499-
RPCResponse::LightClientFinalityUpdate(_) => false,
500-
},
501-
RPCCodedResponse::Error(_, _) => true,
502-
// Stream terminations are part of responses that have chunks
503-
RPCCodedResponse::StreamTermination(_) => true,
504-
}
505-
}
506-
507486
/// Returns true if this response always terminates the stream.
508487
pub fn close_after(&self) -> bool {
509488
!matches!(self, RPCCodedResponse::Success(_))

beacon_node/lighthouse_network/src/rpc/outbound.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ impl<E: EthSpec> OutboundRequest<E> {
9191
}
9292
/* These functions are used in the handler for stream management */
9393

94-
/// Number of responses expected for this request.
95-
pub fn expected_responses(&self) -> u64 {
94+
/// Maximum number of responses expected for this request.
95+
pub fn max_responses(&self) -> u64 {
9696
match self {
9797
OutboundRequest::Status(_) => 1,
9898
OutboundRequest::Goodbye(_) => 0,
@@ -105,6 +105,19 @@ impl<E: EthSpec> OutboundRequest<E> {
105105
}
106106
}
107107

108+
pub fn expect_exactly_one_response(&self) -> bool {
109+
match self {
110+
OutboundRequest::Status(_) => true,
111+
OutboundRequest::Goodbye(_) => false,
112+
OutboundRequest::BlocksByRange(_) => false,
113+
OutboundRequest::BlocksByRoot(_) => false,
114+
OutboundRequest::BlobsByRange(_) => false,
115+
OutboundRequest::BlobsByRoot(_) => false,
116+
OutboundRequest::Ping(_) => true,
117+
OutboundRequest::MetaData(_) => true,
118+
}
119+
}
120+
108121
/// Gives the corresponding `SupportedProtocol` to this request.
109122
pub fn versioned_protocol(&self) -> SupportedProtocol {
110123
match self {

beacon_node/lighthouse_network/src/rpc/protocol.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -654,8 +654,8 @@ pub enum InboundRequest<E: EthSpec> {
654654
impl<E: EthSpec> InboundRequest<E> {
655655
/* These functions are used in the handler for stream management */
656656

657-
/// Number of responses expected for this request.
658-
pub fn expected_responses(&self) -> u64 {
657+
/// Maximum number of responses expected for this request.
658+
pub fn max_responses(&self) -> u64 {
659659
match self {
660660
InboundRequest::Status(_) => 1,
661661
InboundRequest::Goodbye(_) => 0,

beacon_node/lighthouse_network/src/rpc/rate_limiter.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,16 +228,16 @@ impl RPCRateLimiterBuilder {
228228

229229
pub trait RateLimiterItem {
230230
fn protocol(&self) -> Protocol;
231-
fn expected_responses(&self) -> u64;
231+
fn max_responses(&self) -> u64;
232232
}
233233

234234
impl<E: EthSpec> RateLimiterItem for super::InboundRequest<E> {
235235
fn protocol(&self) -> Protocol {
236236
self.versioned_protocol().protocol()
237237
}
238238

239-
fn expected_responses(&self) -> u64 {
240-
self.expected_responses()
239+
fn max_responses(&self) -> u64 {
240+
self.max_responses()
241241
}
242242
}
243243

@@ -246,8 +246,8 @@ impl<E: EthSpec> RateLimiterItem for super::OutboundRequest<E> {
246246
self.versioned_protocol().protocol()
247247
}
248248

249-
fn expected_responses(&self) -> u64 {
250-
self.expected_responses()
249+
fn max_responses(&self) -> u64 {
250+
self.max_responses()
251251
}
252252
}
253253
impl RPCRateLimiter {
@@ -299,7 +299,7 @@ impl RPCRateLimiter {
299299
request: &Item,
300300
) -> Result<(), RateLimitedErr> {
301301
let time_since_start = self.init_time.elapsed();
302-
let tokens = request.expected_responses().max(1);
302+
let tokens = request.max_responses().max(1);
303303

304304
let check =
305305
|limiter: &mut Limiter<PeerId>| limiter.allows(time_since_start, peer_id, tokens);

0 commit comments

Comments
 (0)