Skip to content

Commit ea3bb06

Browse files
committed
Error from RPC send_response when request doesn't exist on the active inbound requests (#7663)
Squashed commit of the following: commit 0ce690c Author: João Oliveira <[email protected]> Date: Fri Jun 27 22:50:11 2025 +0100 Error from RPC `send_response` when request doesn't exist on the active inbound requests. And handle the error from the main service to check if it may be a data race or a critical bug commit 522e00f Author: Jimmy Chen <[email protected]> Date: Sat Jun 28 05:01:46 2025 +1000 Fix incorrect `waker` update condition (#7656) This bug was first found and partially fixed by @VolodymyrBg in #7317 - this PR applies the same fix everywhere else. The old logic updated the waker when it already matched the context, and did nothing when it was stale: ```rust if waker.will_wake(cx.waker()) { self.waker = Some(cx.waker().clone()); } ``` This is the wrong way around. We only want to update the waker if it doesn't match the current context: ```rust if !waker.will_wake(cx.waker()) { self.waker = Some(cx.waker().clone()); } ``` I don't think we've ever noticed any issues, but it’s a subtle bug that could lead to missed wakeups. commit 83cad25 Author: Jimmy Chen <[email protected]> Date: Sat Jun 28 04:21:17 2025 +1000 Fix Rust 1.88 clippy errors & execution engine tests (#7657) Fix Rust 1.88 clippy errors. commit 9b1f3ed Author: Pawan Dhananjay <[email protected]> Date: Thu Jun 26 17:26:38 2025 -0700 Add gossip check (#7652) N/A Add an additional gossip condition. commit a0a6b93 Author: Daniel Knopik <[email protected]> Date: Wed Jun 25 08:22:24 2025 +0200 Do not compute sync selection proofs for the sync duty at the current slot (#7551) commit 8e3c5d1 Author: chonghe <[email protected]> Date: Wed Jun 25 13:33:17 2025 +0800 Rust 1.89 compiler lint fix (#7644) Fix lints for Rust 1.89 beta compiler commit 56b2d4b Author: Eitan Seri-Levi <[email protected]> Date: Tue Jun 24 09:29:10 2025 +0300 Remove instrumenting log level (#7636) #7155 Theres some additional places we set instrumenting log levels that wasn't covered in #7620 commit fd643c3 Author: Michael Sproul <[email protected]> Date: Mon Jun 23 23:11:46 2025 +1000 Un-ignore EF test for v1.6.0-alpha.1 (#7632) Closes: - #7547 Run the test that was previously ignored when we were between spec versions. commit cef04ee Author: chonghe <[email protected]> Date: Mon Jun 23 16:37:49 2025 +0800 Implement `validator_identities` Beacon API endpoint (#7462) * #7442
1 parent 4d11103 commit ea3bb06

File tree

3 files changed

+33
-54
lines changed

3 files changed

+33
-54
lines changed

beacon_node/lighthouse_network/src/rpc/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::marker::PhantomData;
1717
use std::sync::Arc;
1818
use std::task::{Context, Poll};
1919
use std::time::Duration;
20-
use tracing::{debug, error, instrument, trace};
20+
use tracing::{debug, instrument, trace};
2121
use types::{EthSpec, ForkContext};
2222

2323
pub(crate) use handler::{HandlerErr, HandlerEvent};
@@ -199,8 +199,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
199199
}
200200

201201
/// Sends an RPC response.
202-
///
203-
/// The peer must be connected for this to succeed.
202+
/// Returns an `Err` if the request does exist in the active inbound requests list.
204203
#[instrument(parent = None,
205204
level = "trace",
206205
fields(service = "libp2p_rpc"),
@@ -212,11 +211,10 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
212211
peer_id: PeerId,
213212
request_id: InboundRequestId,
214213
response: RpcResponse<E>,
215-
) {
214+
) -> Result<(), RpcResponse<E>> {
216215
let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id)
217216
else {
218-
error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent");
219-
return;
217+
return Err(response);
220218
};
221219

222220
// Add the request back to active requests if the response is `Success` and requires stream
@@ -229,6 +227,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
229227
}
230228

231229
self.send_response_inner(peer_id, request_type.protocol(), request_id, response);
230+
Ok(())
232231
}
233232

234233
fn send_response_inner(
@@ -506,7 +505,8 @@ where
506505
RpcResponse::Success(RpcSuccessResponse::Pong(Ping {
507506
data: self.seq_number,
508507
})),
509-
);
508+
)
509+
.expect("Request to exist");
510510
}
511511

512512
self.events.push(ToSwarm::GenerateEvent(RPCMessage {

beacon_node/lighthouse_network/src/service/mod.rs

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY
1111
use crate::rpc::methods::MetadataRequest;
1212
use crate::rpc::{
1313
GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPCError, RPCMessage,
14-
RPCReceived, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse,
15-
RpcSuccessResponse, RPC,
14+
RPCReceived, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, RPC,
1615
};
1716
use crate::types::{
1817
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
@@ -39,7 +38,7 @@ use std::path::PathBuf;
3938
use std::pin::Pin;
4039
use std::sync::Arc;
4140
use std::time::Duration;
42-
use tracing::{debug, info, instrument, trace, warn};
41+
use tracing::{debug, error, info, instrument, trace, warn};
4342
use types::{
4443
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
4544
};
@@ -1146,35 +1145,22 @@ impl<E: EthSpec> Network<E> {
11461145
name = "libp2p",
11471146
skip_all
11481147
)]
1149-
pub fn send_response(
1148+
pub fn send_response<T: Into<RpcResponse<E>>>(
11501149
&mut self,
11511150
peer_id: PeerId,
11521151
inbound_request_id: InboundRequestId,
1153-
response: Response<E>,
1152+
response: T,
11541153
) {
1155-
self.eth2_rpc_mut()
1156-
.send_response(peer_id, inbound_request_id, response.into())
1157-
}
1158-
1159-
/// Inform the peer that their request produced an error.
1160-
#[instrument(parent = None,
1161-
level = "trace",
1162-
fields(service = "libp2p"),
1163-
name = "libp2p",
1164-
skip_all
1165-
)]
1166-
pub fn send_error_response(
1167-
&mut self,
1168-
peer_id: PeerId,
1169-
inbound_request_id: InboundRequestId,
1170-
error: RpcErrorResponse,
1171-
reason: String,
1172-
) {
1173-
self.eth2_rpc_mut().send_response(
1174-
peer_id,
1175-
inbound_request_id,
1176-
RpcResponse::Error(error, reason.into()),
1177-
)
1154+
if let Err(response) =
1155+
self.eth2_rpc_mut()
1156+
.send_response(peer_id, inbound_request_id, response.into())
1157+
{
1158+
if self.network_globals.peers.read().is_connected(&peer_id) {
1159+
error!(%peer_id, ?inbound_request_id, %response,
1160+
"Request not found in RPC active requests while peer is still connected"
1161+
);
1162+
}
1163+
}
11781164
}
11791165

11801166
/* Peer management functions */
@@ -1460,19 +1446,6 @@ impl<E: EthSpec> Network<E> {
14601446
name = "libp2p",
14611447
skip_all
14621448
)]
1463-
fn send_meta_data_response(
1464-
&mut self,
1465-
_req: MetadataRequest<E>,
1466-
inbound_request_id: InboundRequestId,
1467-
peer_id: PeerId,
1468-
) {
1469-
let metadata = self.network_globals.local_metadata.read().clone();
1470-
// The encoder is responsible for sending the negotiated version of the metadata
1471-
let event = RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
1472-
self.eth2_rpc_mut()
1473-
.send_response(peer_id, inbound_request_id, event);
1474-
}
1475-
14761449
// RPC Propagation methods
14771450
/// Queues the response to be sent upwards as long at it was requested outside the Behaviour.
14781451
#[must_use = "return the response"]
@@ -1760,9 +1733,13 @@ impl<E: EthSpec> Network<E> {
17601733
self.peer_manager_mut().ping_request(&peer_id, ping.data);
17611734
None
17621735
}
1763-
RequestType::MetaData(req) => {
1736+
RequestType::MetaData(_req) => {
17641737
// send the requested meta-data
1765-
self.send_meta_data_response(req, inbound_request_id, peer_id);
1738+
let metadata = self.network_globals.local_metadata.read().clone();
1739+
// The encoder is responsible for sending the negotiated version of the metadata
1740+
let response =
1741+
RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
1742+
self.send_response(peer_id, inbound_request_id, response);
17661743
None
17671744
}
17681745
RequestType::Goodbye(reason) => {

beacon_node/network/src/service.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use futures::channel::mpsc::Sender;
1111
use futures::future::OptionFuture;
1212
use futures::prelude::*;
1313

14+
use lighthouse_network::rpc::methods::RpcResponse;
1415
use lighthouse_network::rpc::InboundRequestId;
1516
use lighthouse_network::rpc::RequestType;
1617
use lighthouse_network::service::Network;
@@ -627,10 +628,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
627628
error,
628629
inbound_request_id,
629630
reason,
630-
} => {
631-
self.libp2p
632-
.send_error_response(peer_id, inbound_request_id, error, reason);
633-
}
631+
} => self.libp2p.send_response(
632+
peer_id,
633+
inbound_request_id,
634+
RpcResponse::Error(error, reason.into()),
635+
),
634636
NetworkMessage::ValidationResult {
635637
propagation_source,
636638
message_id,

0 commit comments

Comments
 (0)