Skip to content

Commit cd6edbc

Browse files
committed
Getting closer
1 parent e9eb455 commit cd6edbc

File tree

8 files changed

+215
-143
lines changed

8 files changed

+215
-143
lines changed

Cargo.lock

Lines changed: 7 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/http_api/src/lib.rs

Lines changed: 14 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3572,74 +3572,33 @@ pub fn serve<T: BeaconChainTypes>(
35723572
let get_events = warp::path("lighthouse")
35733573
.and(warp::path("logs"))
35743574
.and(warp::path::end())
3575+
.and(sse_component_filter)
35753576
.and_then(
3576-
|topics_res: Result<api_types::EventQuery, warp::Rejection>,
3577-
chain: Arc<BeaconChain<T>>| {
3577+
|sse_component: Option<SSELoggingComponents>| {
35783578
blocking_task(move || {
3579-
let topics = topics_res?;
3580-
// for each topic subscribed spawn a new subscription
3581-
let mut receivers = Vec::with_capacity(topics.topics.len());
35823579

3583-
if let Some(event_handler) = chain.event_handler.as_ref() {
3584-
for topic in topics.topics {
3585-
let receiver = match topic {
3586-
api_types::EventTopic::Head => event_handler.subscribe_head(),
3587-
api_types::EventTopic::Block => event_handler.subscribe_block(),
3588-
api_types::EventTopic::Attestation => {
3589-
event_handler.subscribe_attestation()
3590-
}
3591-
api_types::EventTopic::VoluntaryExit => {
3592-
event_handler.subscribe_exit()
3593-
}
3594-
api_types::EventTopic::FinalizedCheckpoint => {
3595-
event_handler.subscribe_finalized()
3596-
}
3597-
api_types::EventTopic::ChainReorg => {
3598-
event_handler.subscribe_reorgs()
3599-
}
3600-
api_types::EventTopic::ContributionAndProof => {
3601-
event_handler.subscribe_contributions()
3602-
}
3603-
api_types::EventTopic::LateHead => {
3604-
event_handler.subscribe_late_head()
3605-
}
3606-
api_types::EventTopic::BlockReward => {
3607-
event_handler.subscribe_block_reward()
3608-
}
3609-
};
3580+
if let Some(logging_components) = sse_component {
3581+
// Build a JSON stream
3582+
let
3583+
let s = BroadcastStream::new(sse_component.sender.subscribe()).map(|msg| {
3584+
// Serialize to json
36103585

3611-
receivers.push(BroadcastStream::new(receiver).map(|msg| {
3612-
match msg {
3613-
Ok(data) => Event::default()
3614-
.event(data.topic_name())
3615-
.json_data(data)
3616-
.map_err(|e| {
3617-
warp_utils::reject::server_sent_event_error(format!(
3618-
"{:?}",
3619-
e
3620-
))
3621-
}),
3622-
Err(e) => Err(warp_utils::reject::server_sent_event_error(
3623-
format!("{:?}", e),
3624-
)),
3625-
}
3626-
}));
3627-
}
3586+
3587+
3588+
;
3589+
3590+
Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
36283591
} else {
36293592
return Err(warp_utils::reject::custom_server_error(
3630-
"event handler was not initialized".to_string(),
3593+
"SSE Logging is not enabled".to_string(),
36313594
));
36323595
}
3633-
3634-
let s = futures::stream::select_all(receivers);
3635-
3636-
Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
36373596
})
3597+
36383598
},
36393599
);
36403600

36413601

3642-
36433602
// Define the ultimate set of routes that will be provided to the server.
36443603
let routes = warp::get()
36453604
.and(

common/logging/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ test_logger = [] # Print log output to stderr when running tests instead of drop
1010
[dependencies]
1111
slog = "2.5.2"
1212
slog-term = "2.6.0"
13+
tokio = "1.14.0"
1314
lighthouse_metrics = { path = "../lighthouse_metrics" }
1415
lazy_static = "1.4.0"
1516
sloggers = { version = "2.1.1", features = ["json"] }
1617
slog-async = "2.7.0"
1718
take_mut = "0.2.2"
18-
crossbeam-channel = "0.5.7"
19+
parking_lot = "0.12.1"
20+
serde = "1.0.153"

common/logging/src/async_record.rs

Lines changed: 147 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
//! An object that can be used to pass through a channel and be cloned. It can therefore be used
22
//! via the broadcast channel.
33
4+
use parking_lot::Mutex;
5+
use serde::ser::SerializeMap;
6+
use serde::serde_if_integer128;
7+
use serde::Serialize;
48
use slog::{
59
BorrowedKV, Drain, Key, Level, OwnedKVList, Record, RecordStatic, Serializer, SingleKV, KV,
610
};
11+
use std::cell::RefCell;
712
use std::fmt;
13+
use std::fmt::Write;
14+
use std::io;
815
use std::sync::Arc;
916
use take_mut::take;
1017

18+
thread_local! {
19+
static TL_BUF: RefCell<String> = RefCell::new(String::with_capacity(128))
20+
}
21+
1122
/// Serialized record.
1223
#[derive(Clone)]
1324
pub struct AsyncRecord {
@@ -16,7 +27,7 @@ pub struct AsyncRecord {
1627
location: Box<slog::RecordLocation>,
1728
tag: String,
1829
logger_values: OwnedKVList,
19-
kv: Arc<dyn KV + Send>,
30+
kv: Arc<Mutex<dyn KV + Send>>,
2031
}
2132

2233
impl AsyncRecord {
@@ -34,7 +45,7 @@ impl AsyncRecord {
3445
location: Box::new(*record.location()),
3546
tag: String::from(record.tag()),
3647
logger_values: logger_values.clone(),
37-
kv: Arc::new(ser.finish()),
48+
kv: Arc::new(Mutex::new(ser.finish())),
3849
}
3950
}
4051

@@ -46,8 +57,9 @@ impl AsyncRecord {
4657
tag: &self.tag,
4758
};
4859

60+
let kv = self.kv.lock();
4961
drain.log(
50-
&Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&self.kv)),
62+
&Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&(*kv))),
5163
&self.logger_values,
5264
)
5365
}
@@ -60,8 +72,9 @@ impl AsyncRecord {
6072
tag: &self.tag,
6173
};
6274

75+
let kv = self.kv.lock();
6376
f(
64-
&Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&self.kv)),
77+
&Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&(*kv))),
6578
&self.logger_values,
6679
)
6780
}
@@ -167,11 +180,137 @@ impl Serializer for ToSendSerializer {
167180
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
168181
Ok(())
169182
}
183+
}
170184

171-
#[cfg(feature = "nested-values")]
172-
fn emit_serde(&mut self, key: Key, value: &slog::SerdeValue) -> slog::Result {
173-
let val = value.to_sendable();
174-
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
185+
impl Serialize for AsyncRecord {
186+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
187+
where
188+
S: serde::Serializer,
189+
{
190+
let rs = RecordStatic {
191+
location: &*self.location,
192+
level: self.level,
193+
tag: &self.tag,
194+
};
195+
let mut map_serializer = SerdeSerializer::new(serializer)?;
196+
let kv = self.kv.lock();
197+
let message = format_args!("{}", self.msg);
198+
let record = Record::new(&rs, &message, BorrowedKV(&(*kv)));
199+
200+
self.logger_values
201+
.serialize(&record, &mut map_serializer)
202+
.map_err(|e| serde::ser::Error::custom(e))?;
203+
record
204+
.kv()
205+
.serialize(&record, &mut map_serializer)
206+
.map_err(serde::ser::Error::custom)?;
207+
map_serializer.end()
208+
}
209+
}
210+
211+
struct SerdeSerializer<S: serde::Serializer> {
212+
/// Current state of map serializing: `serde::Serializer::MapState`
213+
ser_map: S::SerializeMap,
214+
}
215+
216+
impl<S: serde::Serializer> SerdeSerializer<S> {
217+
fn new(ser: S) -> Result<Self, S::Error> {
218+
let ser_map = ser.serialize_map(None)?;
219+
Ok(SerdeSerializer { ser_map })
220+
}
221+
222+
/// Finish serialization, and return the serializer
223+
fn end(self) -> Result<S::Ok, S::Error> {
224+
self.ser_map.end()
225+
}
226+
}
227+
228+
// NOTE: This is borrowed from slog_json
229+
macro_rules! impl_m(
230+
($s:expr, $key:expr, $val:expr) => ({
231+
let k_s: &str = $key.as_ref();
232+
$s.ser_map.serialize_entry(k_s, $val)
233+
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("serde serialization error: {}", e)))?;
175234
Ok(())
235+
});
236+
);
237+
238+
impl<S> slog::Serializer for SerdeSerializer<S>
239+
where
240+
S: serde::Serializer,
241+
{
242+
fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
243+
impl_m!(self, key, &val)
244+
}
245+
246+
fn emit_unit(&mut self, key: Key) -> slog::Result {
247+
impl_m!(self, key, &())
248+
}
249+
250+
fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
251+
impl_m!(self, key, &val)
252+
}
253+
254+
fn emit_none(&mut self, key: Key) -> slog::Result {
255+
let val: Option<()> = None;
256+
impl_m!(self, key, &val)
257+
}
258+
fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
259+
impl_m!(self, key, &val)
260+
}
261+
fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
262+
impl_m!(self, key, &val)
263+
}
264+
fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
265+
impl_m!(self, key, &val)
266+
}
267+
fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
268+
impl_m!(self, key, &val)
269+
}
270+
fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
271+
impl_m!(self, key, &val)
272+
}
273+
fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
274+
impl_m!(self, key, &val)
275+
}
276+
fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
277+
impl_m!(self, key, &val)
278+
}
279+
fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
280+
impl_m!(self, key, &val)
281+
}
282+
fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
283+
impl_m!(self, key, &val)
284+
}
285+
fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
286+
impl_m!(self, key, &val)
287+
}
288+
fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
289+
impl_m!(self, key, &val)
290+
}
291+
fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
292+
impl_m!(self, key, &val)
293+
}
294+
serde_if_integer128! {
295+
fn emit_u128(&mut self, key: Key, val: u128) -> slog::Result {
296+
impl_m!(self, key, &val)
297+
}
298+
fn emit_i128(&mut self, key: Key, val: i128) -> slog::Result {
299+
impl_m!(self, key, &val)
300+
}
301+
}
302+
fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
303+
impl_m!(self, key, &val)
304+
}
305+
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
306+
TL_BUF.with(|buf| {
307+
let mut buf = buf.borrow_mut();
308+
309+
buf.write_fmt(*val).unwrap();
310+
311+
let res = { || impl_m!(self, key, &*buf) }();
312+
buf.clear();
313+
res
314+
})
176315
}
177316
}

common/logging/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ use std::time::{Duration, Instant};
1212
pub const MAX_MESSAGE_WIDTH: usize = 40;
1313

1414
pub mod async_record;
15-
mod sse_drain;
16-
pub use sse_drain::{SSEDrain, SSELoggingComponents};
15+
mod sse_logging_components;
16+
pub use sse_logging_components::SSELoggingComponents;
1717

1818
/// The minimum interval between log messages indicating that a queue is full.
1919
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);

0 commit comments

Comments
 (0)