Skip to content

Commit 38e0580

Browse files
committed
Further reduce logging overhead: use dedicated writer task and most importantly box the logs to reduce the memory growth of the channels.
1 parent 93a9ebc commit 38e0580

File tree

4 files changed

+57
-74
lines changed

4 files changed

+57
-74
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

trailbase-core/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ base64 = { version = "0.22.1", default-features = false }
2424
bytes = { version = "1.8.0", features = ["serde"] }
2525
chrono = "^0.4.38"
2626
cookie = "0.18.1"
27-
crossbeam-channel = "0.5.13"
2827
ed25519-dalek = { version = "2.1.1", features=["pkcs8", "pem", "rand_core"] }
2928
env_logger = "^0.11.3"
3029
fallible-iterator = "0.3.0"

trailbase-core/src/logging.rs

Lines changed: 43 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use axum_client_ip::InsecureClientIp;
55
use log::*;
66
use serde::{Deserialize, Serialize};
77
use serde_json::json;
8-
use std::sync::Arc;
98
use std::time::Duration;
109
use tracing::field::Field;
1110
use tracing::span::{Attributes, Id, Record, Span};
@@ -98,80 +97,66 @@ pub(super) fn sqlite_logger_on_response(
9897
);
9998
}
10099

101-
struct SqliteLogLayerState {
102-
conn: trailbase_sqlite::Connection,
103-
sender: crossbeam_channel::Sender<LogFieldStorage>,
104-
receiver: crossbeam_channel::Receiver<LogFieldStorage>,
105-
}
106-
107100
pub struct SqliteLogLayer {
108-
state: Arc<SqliteLogLayerState>,
101+
sender: tokio::sync::mpsc::UnboundedSender<Box<LogFieldStorage>>,
109102
}
110103

111104
impl SqliteLogLayer {
112105
pub fn new(state: &AppState) -> Self {
106+
// NOTE: We're boxing the channel contents to lower the growth rate of back-stopped unbound
107+
// channels. The underlying container doesn't seem to every shrink :/.
108+
//
113109
// TODO: should we use a bounded receiver to create back-pressure?
114-
let (sender, receiver) = crossbeam_channel::unbounded();
115-
return SqliteLogLayer {
116-
state: Arc::new(SqliteLogLayerState {
117-
conn: state.logs_conn().clone(),
118-
sender,
119-
receiver,
120-
}),
121-
};
122-
}
123-
124-
// The writer runs in a separate Task in the background and receives Logs via a channel, which it
125-
// then writes to Sqlite.
126-
fn write_log(&self, log: LogFieldStorage) -> Result<(), trailbase_sqlite::Error> {
127-
let state = self.state.clone();
128-
state.sender.send(log).expect(BUG_TEXT);
110+
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
129111

112+
let conn = state.logs_conn().clone();
130113
let rt = tokio::runtime::Handle::current();
131114
rt.spawn(async move {
132-
tokio::time::sleep(Duration::from_millis(50)).await;
115+
const LIMIT: usize = 128;
116+
let mut buffer = Vec::<Box<LogFieldStorage>>::with_capacity(LIMIT);
117+
118+
while receiver.recv_many(&mut buffer, LIMIT).await > 0 {
119+
let logs = std::mem::take(&mut buffer);
120+
121+
let result = conn
122+
.call(move |conn| {
123+
if logs.len() > 1 {
124+
let tx = conn.transaction()?;
125+
for log in logs {
126+
SqliteLogLayer::insert_log(&tx, log)?;
127+
}
128+
tx.commit()?;
129+
} else {
130+
for log in logs {
131+
Self::insert_log(conn, log)?
132+
}
133+
}
133134

134-
// Work stealing.
135-
let logs = state.receiver.try_iter().take(128).collect::<Vec<_>>();
136-
if logs.is_empty() {
137-
return;
138-
}
135+
Ok(())
136+
})
137+
.await;
139138

140-
let result = state.conn.call_and_forget(move |conn| {
141-
if logs.len() > 1 {
142-
fn commit(
143-
conn: &mut rusqlite::Connection,
144-
logs: Vec<LogFieldStorage>,
145-
) -> Result<(), rusqlite::Error> {
146-
let tx = conn.transaction()?;
147-
for log in logs {
148-
SqliteLogLayer::insert_log(&tx, log)?;
149-
}
150-
return tx.commit();
151-
}
152-
153-
if let Err(err) = commit(conn, logs) {
154-
log::warn!("log insert failed: {err}");
155-
}
156-
} else {
157-
for log in logs {
158-
if let Err(err) = Self::insert_log(conn, log) {
159-
log::warn!("log insert failed: {err}");
160-
}
161-
}
139+
if let Err(err) = result {
140+
warn!("Failed to send logs: {err}");
162141
}
163-
});
164-
165-
if let Err(err) = result {
166-
error!("{err}");
167142
}
168143
});
169144

170-
return Ok(());
145+
return SqliteLogLayer { sender };
171146
}
172147

148+
// The writer runs in a separate Task in the background and receives Logs via a channel, which it
149+
// then writes to Sqlite.
173150
#[inline]
174-
fn insert_log(conn: &rusqlite::Connection, log: LogFieldStorage) -> Result<(), rusqlite::Error> {
151+
fn write_log(&self, log: LogFieldStorage) {
152+
self.sender.send(Box::new(log)).expect(BUG_TEXT);
153+
}
154+
155+
#[inline]
156+
fn insert_log(
157+
conn: &rusqlite::Connection,
158+
log: Box<LogFieldStorage>,
159+
) -> Result<(), rusqlite::Error> {
175160
lazy_static::lazy_static! {
176161
static ref QUERY: String = indoc::formatdoc! {"
177162
INSERT INTO
@@ -233,9 +218,7 @@ where
233218

234219
storage.level = level_to_int(event.metadata().level());
235220

236-
if let Err(err) = self.write_log(storage) {
237-
warn!("Failed to send to logs to writer: {err}");
238-
}
221+
self.write_log(storage);
239222
}
240223
}
241224
}

trailbase-core/src/server/mod.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use tokio::signal;
1313
use tokio::task::JoinSet;
1414
use tower_cookies::CookieManagerLayer;
1515
use tower_http::{cors, limit::RequestBodyLimitLayer, services::ServeDir, trace::TraceLayer};
16-
use tracing_subscriber::{filter, prelude::*};
1716

1817
use crate::admin;
1918
use crate::app_state::AppState;
@@ -143,17 +142,20 @@ impl Server {
143142
// This declares **where** tracing is being logged to, e.g. stderr, file, sqlite.
144143
//
145144
// NOTE: it's ok to fail. Just means someone else already initialize the tracing sub-system.
146-
let _ = tracing_subscriber::registry()
147-
.with(
148-
logging::SqliteLogLayer::new(&self.state).with_filter(
149-
filter::Targets::new()
150-
.with_target("tower_http::trace::on_response", filter::LevelFilter::DEBUG)
151-
.with_target("tower_http::trace::on_request", filter::LevelFilter::DEBUG)
152-
.with_target("tower_http::trace::make_span", filter::LevelFilter::DEBUG)
153-
.with_default(filter::LevelFilter::INFO),
154-
),
155-
)
156-
.try_init();
145+
// {
146+
// use tracing_subscriber::{filter, prelude::*};
147+
// let _ = tracing_subscriber::registry()
148+
// .with(
149+
// logging::SqliteLogLayer::new(&self.state).with_filter(
150+
// filter::Targets::new()
151+
// .with_target("tower_http::trace::on_response", filter::LevelFilter::DEBUG)
152+
// .with_target("tower_http::trace::on_request", filter::LevelFilter::DEBUG)
153+
// .with_target("tower_http::trace::make_span", filter::LevelFilter::DEBUG)
154+
// .with_default(filter::LevelFilter::INFO),
155+
// ),
156+
// )
157+
// .try_init();
158+
// }
157159

158160
let _raii_tasks = scheduler::start_periodic_tasks(&self.state);
159161

0 commit comments

Comments
 (0)