Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions beacon_node/network/src/beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,24 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
}
}

pub struct BeaconProcessorSend<T: BeaconChainTypes>(pub mpsc::Sender<WorkEvent<T>>);

impl<T: BeaconChainTypes> BeaconProcessorSend<T> {
pub fn try_send(&self, message: WorkEvent<T>) -> Result<(), TrySendError<WorkEvent<T>>> {
let work_type = message.work_type();
match self.0.try_send(message) {
Ok(res) => Ok(res),
Err(e) => {
metrics::inc_counter_vec(
&metrics::BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE,
&[work_type],
);
Err(e)
}
}
}
}

/// A consensus message (or multiple) from the network that requires processing.
#[derive(Derivative)]
#[derivative(Debug(bound = "T: BeaconChainTypes"))]
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ lazy_static! {
"Gossipsub light_client_optimistic_update errors per error type",
&["type"]
);
pub static ref BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE: Result<IntCounterVec> =
try_create_int_counter_vec(
"beacon_processor_send_error_per_work_type",
"Total number of beacon processor send error per work type",
&["type"]
);


/*
Expand Down
20 changes: 14 additions & 6 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
#![allow(clippy::unit_arg)]

use crate::beacon_processor::{
BeaconProcessor, InvalidBlockStorage, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
BeaconProcessor, BeaconProcessorSend, InvalidBlockStorage, WorkEvent as BeaconWorkEvent,
MAX_WORK_EVENT_QUEUE_LEN,
};
use crate::error;
use crate::service::{NetworkMessage, RequestId};
Expand All @@ -19,6 +20,7 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
};
use logging::TimeLatch;
use slog::{debug, o, trace};
use slog::{error, warn};
use std::cmp;
Expand All @@ -39,9 +41,11 @@ pub struct Router<T: BeaconChainTypes> {
/// A network context to return and handle RPC requests.
network: HandlerNetworkContext<T::EthSpec>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
beacon_processor_send: BeaconProcessorSend<T>,
/// The `Router` logger.
log: slog::Logger,
/// Provides de-bounce functionality for logging.
logger_debounce: TimeLatch,
}

/// Types of messages the router can receive.
Expand Down Expand Up @@ -100,7 +104,7 @@ impl<T: BeaconChainTypes> Router<T> {
beacon_chain.clone(),
network_globals.clone(),
network_send.clone(),
beacon_processor_send.clone(),
BeaconProcessorSend(beacon_processor_send.clone()),
sync_logger,
);

Expand All @@ -124,8 +128,9 @@ impl<T: BeaconChainTypes> Router<T> {
chain: beacon_chain,
sync_send,
network: HandlerNetworkContext::new(network_send, log.clone()),
beacon_processor_send,
beacon_processor_send: BeaconProcessorSend(beacon_processor_send),
log: message_handler_log,
logger_debounce: TimeLatch::default(),
};

// spawn handler task and move the message handler instance into the spawned thread
Expand Down Expand Up @@ -483,8 +488,11 @@ impl<T: BeaconChainTypes> Router<T> {
mpsc::error::TrySendError::Closed(work)
| mpsc::error::TrySendError::Full(work) => work.work_type(),
};
error!(&self.log, "Unable to send message to the beacon processor";
"error" => %e, "type" => work_type)

if self.logger_debounce.elapsed() {
error!(&self.log, "Unable to send message to the beacon processor";
"error" => %e, "type" => work_type)
}
})
}
}
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use crate::beacon_processor::BeaconProcessorSend;
use crate::service::RequestId;
use crate::sync::manager::RequestId as SyncId;
use crate::NetworkMessage;
Expand Down Expand Up @@ -54,7 +55,7 @@ impl TestRig {
SyncNetworkContext::new(
network_tx,
globals,
beacon_processor_tx,
BeaconProcessorSend(beacon_processor_tx),
log.new(slog::o!("component" => "network_context")),
)
};
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use super::block_lookups::BlockLookups;
use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::beacon_processor::{BeaconProcessorSend, ChainSegmentProcessId};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
Expand Down Expand Up @@ -188,7 +188,7 @@ pub fn spawn<T: BeaconChainTypes>(
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
beacon_processor_send: BeaconProcessorSend<T>,
log: slog::Logger,
) -> mpsc::UnboundedSender<SyncMessage<T::EthSpec>> {
assert!(
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ChainId};
use crate::beacon_processor::WorkEvent;
use crate::beacon_processor::BeaconProcessorSend;
use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChainTypes, EngineState};
Expand Down Expand Up @@ -37,7 +37,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
execution_engine_state: EngineState,

/// Channel to send work to the beacon processor.
beacon_processor_send: mpsc::Sender<WorkEvent<T>>,
beacon_processor_send: BeaconProcessorSend<T>,

/// Logger for the `SyncNetworkContext`.
log: slog::Logger,
Expand All @@ -47,7 +47,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn new(
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<WorkEvent<T>>,
beacon_processor_send: BeaconProcessorSend<T>,
log: slog::Logger,
) -> Self {
Self {
Expand Down Expand Up @@ -278,12 +278,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
}

pub fn processor_channel_if_enabled(&self) -> Option<&mpsc::Sender<WorkEvent<T>>> {
pub fn processor_channel_if_enabled(&self) -> Option<&BeaconProcessorSend<T>> {
self.is_execution_engine_online()
.then_some(&self.beacon_processor_send)
}

pub fn processor_channel(&self) -> &mpsc::Sender<WorkEvent<T>> {
pub fn processor_channel(&self) -> &BeaconProcessorSend<T> {
&self.beacon_processor_send
}

Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/sync/range_sync/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ mod tests {
use crate::NetworkMessage;

use super::*;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::beacon_processor::{BeaconProcessorSend, WorkEvent as BeaconWorkEvent};
use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::parking_lot::RwLock;
Expand Down Expand Up @@ -603,7 +603,7 @@ mod tests {
let cx = SyncNetworkContext::new(
network_tx,
globals.clone(),
beacon_processor_tx,
BeaconProcessorSend(beacon_processor_tx),
log.new(o!("component" => "network_context")),
);
let test_rig = TestRig {
Expand Down