Skip to content
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beacon_node/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ edition = "2021"

[dev-dependencies]
serde_yaml = "0.8.13"
logging = { path = "../../common/logging" }
state_processing = { path = "../../consensus/state_processing" }
operation_pool = { path = "../operation_pool" }
tokio = "1.14.0"
Expand All @@ -17,6 +16,7 @@ store = { path = "../store" }
network = { path = "../network" }
timer = { path = "../timer" }
lighthouse_network = { path = "../lighthouse_network" }
logging = { path = "../../common/logging" }
parking_lot = "0.12.0"
types = { path = "../../consensus/types" }
eth2_config = { path = "../../common/eth2_config" }
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ where
network_globals: None,
eth1_service: Some(genesis_service.eth1_service.clone()),
log: context.log().clone(),
sse_logging_components: runtime_context.sse_logging_components.clone(),
});

// Discard the error from the oneshot.
Expand Down Expand Up @@ -692,6 +693,7 @@ where
network_senders: self.network_senders.clone(),
network_globals: self.network_globals.clone(),
eth1_service: self.eth1_service.clone(),
sse_logging_components: runtime_context.sse_logging_components.clone(),
log: log.clone(),
});

Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tree_hash = "0.4.1"
sysinfo = "0.26.5"
system_health = { path = "../../common/system_health" }
directory = { path = "../../common/directory" }
logging = { path = "../../common/logging" }
eth2_serde_utils = "0.1.1"
operation_pool = { path = "../operation_pool" }

Expand Down
49 changes: 49 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use eth2::types::{
};
use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
use lighthouse_version::version_with_platform;
use logging::SSELoggingComponents;
use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage};
use operation_pool::ReceivedPreCapella;
use parking_lot::RwLock;
Expand Down Expand Up @@ -105,6 +106,7 @@ pub struct Context<T: BeaconChainTypes> {
pub network_senders: Option<NetworkSenders<T::EthSpec>>,
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
pub eth1_service: Option<eth1::Service>,
pub sse_logging_components: Option<SSELoggingComponents>,
pub log: Logger,
}

Expand Down Expand Up @@ -445,6 +447,9 @@ pub fn serve<T: BeaconChainTypes>(
let inner_ctx = ctx.clone();
let log_filter = warp::any().map(move || inner_ctx.log.clone());

let inner_components = ctx.sse_logging_components.clone();
let sse_component_filter = warp::any().map(move || inner_components.clone());

// Create a `warp` filter that provides access to local system information.
let system_info = Arc::new(RwLock::new(sysinfo::System::new()));
{
Expand Down Expand Up @@ -3564,6 +3569,49 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// Subscribe to logs via Server Side Events
// /lighthouse/logs
let lighthouse_log_events = warp::path("lighthouse")
.and(warp::path("logs"))
.and(warp::path::end())
.and(sse_component_filter)
.and_then(|sse_component: Option<SSELoggingComponents>| {
blocking_response_task(move || {
if let Some(logging_components) = sse_component {
// Build a JSON stream
let s =
BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| {
match msg {
Ok(data) => {
// Serialize to json
match data.to_json_string() {
// Send the json as a Server Sent Event
Ok(json) => Event::default().json_data(json).map_err(|e| {
warp_utils::reject::server_sent_event_error(format!(
"{:?}",
e
))
}),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}", e),
)),
}
}
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}", e),
)),
}
});

Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
} else {
Err(warp_utils::reject::custom_server_error(
"SSE Logging is not enabled".to_string(),
))
}
})
});

// Define the ultimate set of routes that will be provided to the server.
// Use `uor` rather than `or` in order to simplify types (see `UnifyingOrFilter`).
let routes = warp::get()
Expand Down Expand Up @@ -3630,6 +3678,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_lighthouse_block_packing_efficiency)
.uor(get_lighthouse_merge_readiness)
.uor(get_events)
.uor(lighthouse_log_events.boxed())
.recover(warp_utils::reject::handle_rejection),
)
.boxed()
Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
network_senders: Some(network_senders),
network_globals: Some(network_globals),
eth1_service: Some(eth1_service),
sse_logging_components: None,
log,
});
let ctx = context.clone();
Expand Down
29 changes: 29 additions & 0 deletions book/src/api-lighthouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -677,3 +677,32 @@ Caveats:
This is because the state _prior_ to the `start_epoch` needs to be loaded from the database, and
loading a state on a boundary is most efficient.


### `/lighthouse/logs`

This is a Server Side Event subscription endpoint. This allows a user to read
the Lighthouse logs directly from the HTTP API endpoint. This currently
exposes INFO and higher level logs.

Example:

```bash
curl -N "http://localhost:5052/lighthouse/logs"
```

Should provide an output that emits log events as they occur:
```json
{
"data": {
"time": "Mar 13 15:28:41",
"level": "INFO",
"msg": "Syncing",
"service": "slot_notifier",
"est_time": "1 hr 27 mins",
"speed": "5.33 slots/sec",
"distance": "28141 slots (3 days 21 hrs)",
"peers": "8"
}
}
```

30 changes: 30 additions & 0 deletions book/src/api-vc-endpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,33 @@ The following fields may be omitted or nullified to obtain default values:
### Example Response Body

*No data is included in the response body.*

## `GET /lighthouse/logs`

Provides a subscription to receive logs as Server Side Events. Currently the
logs emitted are INFO level or higher.

### HTTP Specification

| Property | Specification |
|-------------------|--------------------------------------------|
| Path | `/lighthouse/logs` |
| Method | GET |
| Required Headers | [`Authorization`](./api-vc-auth-header.md) |
| Typical Responses | 200 |

### Example Response Body

```json
{
"data": {
"time": "Mar 13 15:26:53",
"level": "INFO",
"msg": "Connected to beacon node(s)",
"service": "notifier",
"synced": 1,
"available": 1,
"total": 1
}
}
```
7 changes: 7 additions & 0 deletions common/logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ test_logger = [] # Print log output to stderr when running tests instead of drop
[dependencies]
slog = "2.5.2"
slog-term = "2.6.0"
tokio = { version = "1.26.0", features = ["sync"] }
lighthouse_metrics = { path = "../lighthouse_metrics" }
lazy_static = "1.4.0"
sloggers = { version = "2.1.1", features = ["json"] }
slog-async = "2.7.0"
take_mut = "0.2.2"
parking_lot = "0.12.1"
serde = "1.0.153"
serde_json = "1.0.94"
chrono = "0.4.23"
Loading