Skip to content

Commit b688cfb

Browse files
committed
More temp commits
1 parent 228b930 commit b688cfb

File tree

18 files changed

+362
-60
lines changed

18 files changed

+362
-60
lines changed

Cargo.lock

Lines changed: 6 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ serde = "1.0.116"
3535
clap_utils = { path = "../common/clap_utils" }
3636
hyper = "0.14.4"
3737
lighthouse_version = { path = "../common/lighthouse_version" }
38+
logging = { path = "../common/logging" }
3839
hex = "0.4.2"
3940
slasher = { path = "../slasher", default-features = false }
4041
monitoring_api = { path = "../common/monitoring_api" }

beacon_node/client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ edition = "2021"
66

77
[dev-dependencies]
88
serde_yaml = "0.8.13"
9-
logging = { path = "../../common/logging" }
109
state_processing = { path = "../../consensus/state_processing" }
1110
operation_pool = { path = "../operation_pool" }
1211
tokio = "1.14.0"
@@ -17,6 +16,7 @@ store = { path = "../store" }
1716
network = { path = "../network" }
1817
timer = { path = "../timer" }
1918
lighthouse_network = { path = "../lighthouse_network" }
19+
logging = { path = "../../common/logging" }
2020
parking_lot = "0.12.0"
2121
types = { path = "../../consensus/types" }
2222
eth2_config = { path = "../../common/eth2_config" }

beacon_node/client/src/builder.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use eth2::{
1919
types::{BlockId, StateId},
2020
BeaconNodeHttpClient, Error as ApiError, Timeouts,
2121
};
22+
use logging::SSELoggingComponents;
2223
use execution_layer::ExecutionLayer;
2324
use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH};
2425
use lighthouse_network::{prometheus_client::registry::Registry, NetworkGlobals};
@@ -72,6 +73,8 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
7273
http_metrics_config: http_metrics::Config,
7374
slasher: Option<Arc<Slasher<T::EthSpec>>>,
7475
eth_spec_instance: T::EthSpec,
76+
sse_logging_components: Option<SSELoggingComponents>,
77+
7578
}
7679

7780
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
@@ -86,7 +89,7 @@ where
8689
/// Instantiates a new, empty builder.
8790
///
8891
/// The `eth_spec_instance` parameter is used to concretize `TEthSpec`.
89-
pub fn new(eth_spec_instance: TEthSpec) -> Self {
92+
pub fn new(eth_spec_instance: TEthSpec, sse_logging_components: Option<SSELoggingComponents>) -> Self {
9093
Self {
9194
slot_clock: None,
9295
store: None,
@@ -104,6 +107,7 @@ where
104107
http_metrics_config: <_>::default(),
105108
slasher: None,
106109
eth_spec_instance,
110+
sse_logging_components,
107111
}
108112
}
109113

@@ -472,6 +476,7 @@ where
472476
network_globals: None,
473477
eth1_service: Some(genesis_service.eth1_service.clone()),
474478
log: context.log().clone(),
479+
sse_logging_components: self.sse_logging_components.take(),
475480
});
476481

477482
// Discard the error from the oneshot.
@@ -596,6 +601,12 @@ where
596601
self
597602
}
598603

604+
/// Adds SSE Logging components to the builder if they exist.
605+
pub fn sse_logging_comonents(mut self, components: Option<SSELoggingComponents>) -> Self {
606+
self.sse_logging_components = components;
607+
self
608+
}
609+
599610
/// Provides configuration for the HTTP server that serves Prometheus metrics.
600611
pub fn http_metrics_config(mut self, config: http_metrics::Config) -> Self {
601612
self.http_metrics_config = config;
@@ -692,6 +703,7 @@ where
692703
network_senders: self.network_senders.clone(),
693704
network_globals: self.network_globals.clone(),
694705
eth1_service: self.eth1_service.clone(),
706+
sse_logging_components: self.sse_logging_components.take(),
695707
log: log.clone(),
696708
});
697709

beacon_node/http_api/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ tree_hash = "0.4.1"
3636
sysinfo = "0.26.5"
3737
system_health = { path = "../../common/system_health" }
3838
directory = { path = "../../common/directory" }
39+
logging = { path = "../../common/logging" }
3940
eth2_serde_utils = "0.1.1"
4041
operation_pool = { path = "../operation_pool" }
4142

beacon_node/http_api/src/lib.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ use warp_utils::{
7373
query::multi_key_query,
7474
task::{blocking_json_task, blocking_task},
7575
};
76+
use logging::SSELoggingComponents;
7677

7778
const API_PREFIX: &str = "eth";
7879

@@ -105,6 +106,7 @@ pub struct Context<T: BeaconChainTypes> {
105106
pub network_senders: Option<NetworkSenders<T::EthSpec>>,
106107
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
107108
pub eth1_service: Option<eth1::Service>,
109+
pub sse_logging_components: Option<SSELoggingComponents>,
108110
pub log: Logger,
109111
}
110112

@@ -445,6 +447,10 @@ pub fn serve<T: BeaconChainTypes>(
445447
let inner_ctx = ctx.clone();
446448
let log_filter = warp::any().map(move || inner_ctx.log.clone());
447449

450+
let inner_components = ctx.sse_logging_components.clone();
451+
let sse_component_filter = warp::any().map(move || inner_components.clone());
452+
453+
448454
// Create a `warp` filter that provides access to local system information.
449455
let system_info = Arc::new(RwLock::new(sysinfo::System::new()));
450456
{
@@ -3560,6 +3566,80 @@ pub fn serve<T: BeaconChainTypes>(
35603566
},
35613567
);
35623568

3569+
3570+
// Subscribe to logs via Server Side Events
3571+
// /lighthouse/logs
3572+
let get_events = warp::path("lighthouse")
3573+
.and(warp::path("logs"))
3574+
.and(warp::path::end())
3575+
.and_then(
3576+
|topics_res: Result<api_types::EventQuery, warp::Rejection>,
3577+
chain: Arc<BeaconChain<T>>| {
3578+
blocking_task(move || {
3579+
let topics = topics_res?;
3580+
// for each topic subscribed spawn a new subscription
3581+
let mut receivers = Vec::with_capacity(topics.topics.len());
3582+
3583+
if let Some(event_handler) = chain.event_handler.as_ref() {
3584+
for topic in topics.topics {
3585+
let receiver = match topic {
3586+
api_types::EventTopic::Head => event_handler.subscribe_head(),
3587+
api_types::EventTopic::Block => event_handler.subscribe_block(),
3588+
api_types::EventTopic::Attestation => {
3589+
event_handler.subscribe_attestation()
3590+
}
3591+
api_types::EventTopic::VoluntaryExit => {
3592+
event_handler.subscribe_exit()
3593+
}
3594+
api_types::EventTopic::FinalizedCheckpoint => {
3595+
event_handler.subscribe_finalized()
3596+
}
3597+
api_types::EventTopic::ChainReorg => {
3598+
event_handler.subscribe_reorgs()
3599+
}
3600+
api_types::EventTopic::ContributionAndProof => {
3601+
event_handler.subscribe_contributions()
3602+
}
3603+
api_types::EventTopic::LateHead => {
3604+
event_handler.subscribe_late_head()
3605+
}
3606+
api_types::EventTopic::BlockReward => {
3607+
event_handler.subscribe_block_reward()
3608+
}
3609+
};
3610+
3611+
receivers.push(BroadcastStream::new(receiver).map(|msg| {
3612+
match msg {
3613+
Ok(data) => Event::default()
3614+
.event(data.topic_name())
3615+
.json_data(data)
3616+
.map_err(|e| {
3617+
warp_utils::reject::server_sent_event_error(format!(
3618+
"{:?}",
3619+
e
3620+
))
3621+
}),
3622+
Err(e) => Err(warp_utils::reject::server_sent_event_error(
3623+
format!("{:?}", e),
3624+
)),
3625+
}
3626+
}));
3627+
}
3628+
} else {
3629+
return Err(warp_utils::reject::custom_server_error(
3630+
"event handler was not initialized".to_string(),
3631+
));
3632+
}
3633+
3634+
let s = futures::stream::select_all(receivers);
3635+
3636+
Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
3637+
})
3638+
},
3639+
);
3640+
3641+
3642+
35633643
// Define the ultimate set of routes that will be provided to the server.
35643644
let routes = warp::get()
35653645
.and(

beacon_node/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use slog::{info, warn};
2121
use std::ops::{Deref, DerefMut};
2222
use std::sync::Arc;
2323
use types::EthSpec;
24+
use logging::SSELoggingComponents;
2425

2526
/// A type-alias to the tighten the definition of a production-intended `Client`.
2627
pub type ProductionClient<E> =
@@ -45,9 +46,10 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
4546
pub async fn new_from_cli(
4647
context: RuntimeContext<E>,
4748
matches: ArgMatches<'static>,
49+
sse_log: Option<SSELoggingComponents>,
4850
) -> Result<Self, String> {
4951
let client_config = get_config::<E>(&matches, &context)?;
50-
Self::new(context, client_config).await
52+
Self::new(context, client_config, sse_log).await
5153
}
5254

5355
/// Starts a new beacon node `Client` in the given `environment`.
@@ -56,6 +58,7 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
5658
pub async fn new(
5759
context: RuntimeContext<E>,
5860
mut client_config: ClientConfig,
61+
sse_logging_components: Option<SSELoggingComponents>,
5962
) -> Result<Self, String> {
6063
let spec = context.eth2_config().spec.clone();
6164
let client_genesis = client_config.genesis.clone();
@@ -80,7 +83,7 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
8083
TimeoutRwLock::disable_timeouts()
8184
}
8285

83-
let builder = ClientBuilder::new(context.eth_spec_instance.clone())
86+
let builder = ClientBuilder::new(context.eth_spec_instance.clone(), sse_logging_components)
8487
.runtime_context(context)
8588
.chain_spec(spec)
8689
.http_api_config(client_config.http_api.clone())

common/logging/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ test_logger = [] # Print log output to stderr when running tests instead of drop
1010
[dependencies]
1111
slog = "2.5.2"
1212
slog-term = "2.6.0"
13+
tokio = "1.14.0"
1314
lighthouse_metrics = { path = "../lighthouse_metrics" }
1415
lazy_static = "1.4.0"
1516
sloggers = { version = "2.1.1", features = ["json"] }
17+
slog-async = "2.7.0"
18+
take_mut = "0.2.2"

0 commit comments

Comments
 (0)