Skip to content

Commit 93a9ebc

Browse files
committed
Implement batch log inserts via stealing.
1 parent 03d918c commit 93a9ebc

File tree

3 files changed

+86
-32
lines changed

3 files changed

+86
-32
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

trailbase-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ base64 = { version = "0.22.1", default-features = false }
2424
bytes = { version = "1.8.0", features = ["serde"] }
2525
chrono = "^0.4.38"
2626
cookie = "0.18.1"
27+
crossbeam-channel = "0.5.13"
2728
ed25519-dalek = { version = "2.1.1", features=["pkcs8", "pem", "rand_core"] }
2829
env_logger = "^0.11.3"
2930
fallible-iterator = "0.3.0"

trailbase-core/src/logging.rs

Lines changed: 84 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use axum_client_ip::InsecureClientIp;
55
use log::*;
66
use serde::{Deserialize, Serialize};
77
use serde_json::json;
8+
use std::sync::Arc;
89
use std::time::Duration;
910
use tracing::field::Field;
1011
use tracing::span::{Attributes, Id, Record, Span};
@@ -97,54 +98,105 @@ pub(super) fn sqlite_logger_on_response(
9798
);
9899
}
99100

100-
pub struct SqliteLogLayer {
101+
struct SqliteLogLayerState {
101102
conn: trailbase_sqlite::Connection,
103+
sender: crossbeam_channel::Sender<LogFieldStorage>,
104+
receiver: crossbeam_channel::Receiver<LogFieldStorage>,
105+
}
106+
107+
pub struct SqliteLogLayer {
108+
state: Arc<SqliteLogLayerState>,
102109
}
103110

104111
impl SqliteLogLayer {
105112
pub fn new(state: &AppState) -> Self {
113+
// TODO: should we use a bounded receiver to create back-pressure?
114+
let (sender, receiver) = crossbeam_channel::unbounded();
106115
return SqliteLogLayer {
107-
conn: state.logs_conn().clone(),
116+
state: Arc::new(SqliteLogLayerState {
117+
conn: state.logs_conn().clone(),
118+
sender,
119+
receiver,
120+
}),
108121
};
109122
}
110123

111124
// The writer runs in a separate Task in the background and receives Logs via a channel, which it
112125
// then writes to Sqlite.
113-
//
114-
// TODO: should we use a bounded receiver to create back-pressure?
115-
// TODO: We could use recv_many() and batch insert.
116126
fn write_log(&self, log: LogFieldStorage) -> Result<(), trailbase_sqlite::Error> {
117-
return self.conn.call_and_forget(move |conn| {
118-
let mut stmt = match conn.prepare_cached(
119-
r#"
127+
let state = self.state.clone();
128+
state.sender.send(log).expect(BUG_TEXT);
129+
130+
let rt = tokio::runtime::Handle::current();
131+
rt.spawn(async move {
132+
tokio::time::sleep(Duration::from_millis(50)).await;
133+
134+
// Work stealing.
135+
let logs = state.receiver.try_iter().take(128).collect::<Vec<_>>();
136+
if logs.is_empty() {
137+
return;
138+
}
139+
140+
let result = state.conn.call_and_forget(move |conn| {
141+
if logs.len() > 1 {
142+
fn commit(
143+
conn: &mut rusqlite::Connection,
144+
logs: Vec<LogFieldStorage>,
145+
) -> Result<(), rusqlite::Error> {
146+
let tx = conn.transaction()?;
147+
for log in logs {
148+
SqliteLogLayer::insert_log(&tx, log)?;
149+
}
150+
return tx.commit();
151+
}
152+
153+
if let Err(err) = commit(conn, logs) {
154+
log::warn!("log insert failed: {err}");
155+
}
156+
} else {
157+
for log in logs {
158+
if let Err(err) = Self::insert_log(conn, log) {
159+
log::warn!("log insert failed: {err}");
160+
}
161+
}
162+
}
163+
});
164+
165+
if let Err(err) = result {
166+
error!("{err}");
167+
}
168+
});
169+
170+
return Ok(());
171+
}
172+
173+
#[inline]
174+
fn insert_log(conn: &rusqlite::Connection, log: LogFieldStorage) -> Result<(), rusqlite::Error> {
175+
lazy_static::lazy_static! {
176+
static ref QUERY: String = indoc::formatdoc! {"
120177
INSERT INTO
121178
_logs (type, level, status, method, url, latency, client_ip, referer, user_agent)
122179
VALUES
123180
($1, $2, $3, $4, $5, $6, $7, $8, $9)
124-
"#,
125-
) {
126-
Ok(stmt) => stmt,
127-
Err(err) => {
128-
warn!("Logs stmt failed: {err}");
129-
return;
130-
}
131-
};
132-
133-
if let Err(err) = stmt.execute(rusqlite::params!(
134-
// "type": FIXME: should be: admin, records, auth, other request
135-
LogType::HttpRequest as i32,
136-
log.level,
137-
log.status,
138-
log.method,
139-
log.uri,
140-
log.latency_ms,
141-
log.client_ip,
142-
log.referer,
143-
log.user_agent // FIXME: we're not writing the JSON data.
144-
)) {
145-
warn!("logs writing failed: {err}");
146-
}
147-
});
181+
"};
182+
}
183+
184+
let mut stmt = conn.prepare_cached(&QUERY)?;
185+
stmt.execute((
186+
// FIXME: we're not writing the JSON data.
187+
// FIXME: type-field is hard-coded. Should be: admin, records, auth, other request
188+
LogType::HttpRequest as i32,
189+
log.level,
190+
log.status,
191+
log.method,
192+
log.uri,
193+
log.latency_ms,
194+
log.client_ip,
195+
log.referer,
196+
log.user_agent,
197+
))?;
198+
199+
return Ok(());
148200
}
149201
}
150202

0 commit comments

Comments
 (0)