Skip to content

Commit e0ad4a8

Browse files
committed
Extend trailbase-sqlites execution model to allow for parallel reads. This reduces the latency long-tail for slow reads.
Note that this complicates the APIs, since it pushes the responsibility of declaring a query a read or write to the user to then be scheduled appropriately. Add `.(read_|)query_row_f` APIs similar to rusqlites `conn.query_row` accepting a `|row| -> Result<T>` to reduce the use of `Row` and `Rows`. Make benchmarks more isolated by not sharing a DB across runs accumulating writes.
1 parent 284422c commit e0ad4a8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+920
-561
lines changed

trailbase-cli/src/bin/trail.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ async fn get_user_by_email(
5555
email: &str,
5656
) -> Result<DbUser, BoxError> {
5757
if let Some(user) = conn
58-
.query_value::<DbUser>(
59-
&format!("SELECT * FROM {USER_TABLE} WHERE email = $1"),
58+
.read_query_value::<DbUser>(
59+
format!("SELECT * FROM {USER_TABLE} WHERE email = $1"),
6060
(email.to_string(),),
6161
)
6262
.await?
@@ -165,7 +165,7 @@ async fn async_main() -> Result<(), BoxError> {
165165
match cmd {
166166
Some(AdminSubCommands::List) => {
167167
let users = conn
168-
.query_values::<DbUser>(&format!("SELECT * FROM {USER_TABLE} WHERE admin > 0"), ())
168+
.read_query_values::<DbUser>(format!("SELECT * FROM {USER_TABLE} WHERE admin > 0"), ())
169169
.await?;
170170

171171
println!("{: >36}\temail\tcreated\tupdated", "id");
@@ -183,7 +183,7 @@ async fn async_main() -> Result<(), BoxError> {
183183
Some(AdminSubCommands::Demote { email }) => {
184184
conn
185185
.execute(
186-
&format!("UPDATE {USER_TABLE} SET admin = FALSE WHERE email = $1"),
186+
format!("UPDATE {USER_TABLE} SET admin = FALSE WHERE email = $1"),
187187
(email.clone(),),
188188
)
189189
.await?;
@@ -193,7 +193,7 @@ async fn async_main() -> Result<(), BoxError> {
193193
Some(AdminSubCommands::Promote { email }) => {
194194
conn
195195
.execute(
196-
&format!("UPDATE {USER_TABLE} SET admin = TRUE WHERE email = $1"),
196+
format!("UPDATE {USER_TABLE} SET admin = TRUE WHERE email = $1"),
197197
(email.clone(),),
198198
)
199199
.await?;

trailbase-core/benches/benchmark.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@ async fn add_room(
6060
name: &str,
6161
) -> Result<[u8; 16], anyhow::Error> {
6262
let room: [u8; 16] = conn
63-
.query_row(
63+
.query_row_f(
6464
"INSERT INTO room (name) VALUES ($1) RETURNING id",
6565
params!(name.to_string()),
66+
|row| row.get(0),
6667
)
6768
.await?
68-
.unwrap()
69-
.get(0)?;
69+
.unwrap();
7070

7171
return Ok(room);
7272
}
@@ -111,11 +111,10 @@ async fn setup_app() -> Result<Setup, anyhow::Error> {
111111
})
112112
.await?;
113113

114-
let state = app.state();
115-
let conn = state.conn();
114+
let conn = app.state.conn();
116115

117116
create_chat_message_app_tables(conn).await?;
118-
state.refresh_table_cache().await?;
117+
app.state.refresh_table_cache().await?;
119118

120119
let room = add_room(conn, "room0").await?;
121120
let password = "Secret!1!!";
@@ -124,7 +123,7 @@ async fn setup_app() -> Result<Setup, anyhow::Error> {
124123
r#"(SELECT 1 FROM room_members WHERE user = _USER_.id AND room = _REQ_.room)"#;
125124

126125
add_record_api_config(
127-
&state,
126+
&app.state,
128127
RecordApiConfig {
129128
name: Some("messages_api".to_string()),
130129
table_name: Some("message".to_string()),
@@ -137,7 +136,7 @@ async fn setup_app() -> Result<Setup, anyhow::Error> {
137136

138137
let email = "[email protected]";
139138
let user_x = create_user_handler(
140-
State(state.clone()),
139+
State(app.state.clone()),
141140
Json(CreateUserRequest {
142141
email: email.to_string(),
143142
password: password.to_string(),
@@ -149,7 +148,7 @@ async fn setup_app() -> Result<Setup, anyhow::Error> {
149148
.id
150149
.into_bytes();
151150

152-
let user_x_token = login_with_password(&state, email, password)
151+
let user_x_token = login_with_password(&app.state, email, password)
153152
.await?
154153
.auth_token;
155154

@@ -208,7 +207,7 @@ fn create_message_benchmark(b: &mut Bencher, runtime: &tokio::runtime::Runtime,
208207
let tasks = (0..iters).map(|_i| {
209208
let body = body.clone();
210209
let auth = format!("Bearer {user_x_token}");
211-
let mut router = setup.app.router().clone();
210+
let mut router = setup.app.main_router.1.clone();
212211

213212
return runtime.spawn(async move {
214213
let response = router
@@ -244,7 +243,7 @@ fn benchmark_group(c: &mut Criterion) {
244243

245244
let setup = runtime.block_on(async {
246245
let setup = setup_app().await.unwrap();
247-
let mut router = setup.app.router().clone();
246+
let mut router = setup.app.main_router.1.clone();
248247

249248
ServiceExt::<Request<Body>>::ready(&mut router)
250249
.await

trailbase-core/src/admin/list_logs.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,13 @@ pub async fn list_logs_handler(
126126
build_filter_where_clause(&table_metadata.schema.columns, filter_params)?;
127127

128128
let total_row_count: i64 = conn
129-
.query_value(
130-
&format!(
129+
.read_query_row_f(
130+
format!(
131131
"SELECT COUNT(*) FROM {LOGS_TABLE_NAME} WHERE {where_clause}",
132132
where_clause = filter_where_clause.clause
133133
),
134134
filter_where_clause.params.clone(),
135+
|row| row.get(0),
135136
)
136137
.await?
137138
.unwrap_or(-1);
@@ -243,7 +244,11 @@ async fn fetch_logs(
243244
"#,
244245
);
245246

246-
return Ok(conn.query_values::<LogEntry>(&sql_query, params).await?);
247+
return Ok(
248+
conn
249+
.read_query_values::<LogEntry>(sql_query, params)
250+
.await?,
251+
);
247252
}
248253

249254
#[derive(Debug, Serialize, TS)]
@@ -315,7 +320,7 @@ async fn fetch_aggregate_stats(
315320
params.extend(filter.params.clone())
316321
}
317322

318-
let rows = conn.query_values::<AggRow>(&qps_query, params).await?;
323+
let rows = conn.read_query_values::<AggRow>(qps_query, params).await?;
319324

320325
let mut rate: Vec<(i64, f64)> = vec![];
321326
for r in rows.iter() {
@@ -348,7 +353,7 @@ async fn fetch_aggregate_stats(
348353
"#
349354
);
350355

351-
let rows = conn.query(&cc_query, ()).await?;
356+
let rows = conn.read_query_rows(cc_query, ()).await?;
352357

353358
let mut country_codes = HashMap::<String, usize>::new();
354359
for row in rows.iter() {
@@ -409,7 +414,7 @@ mod tests {
409414
let smack_in_there1 = (from + Duration::seconds(12 * 3600 + 1)).timestamp();
410415

411416
conn
412-
.execute_batch(&format!(
417+
.execute_batch(format!(
413418
r#"
414419
INSERT INTO {LOGS_TABLE_NAME} (created) VALUES({before});
415420
INSERT INTO {LOGS_TABLE_NAME} (created) VALUES({after});

trailbase-core/src/admin/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ pub async fn query_handler(
7272
));
7373
}
7474

75-
let batched_rows_result = state.conn().execute_batch(&request.query).await;
75+
let batched_rows_result = state.conn().execute_batch(request.query).await;
7676

7777
// In the fallback case we always need to invalidate the cache.
7878
if must_invalidate_table_cache {

trailbase-core/src/admin/rows/delete_rows.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ mod tests {
182182

183183
return state
184184
.conn()
185-
.query_value::<TestTable>(
186-
&format!("SELECT * FROM {table_name} WHERE _rowid_ = ?1"),
185+
.read_query_value::<TestTable>(
186+
format!("SELECT * FROM {table_name} WHERE _rowid_ = ?1"),
187187
trailbase_sqlite::params!(row_id),
188188
)
189189
.await
@@ -203,12 +203,12 @@ mod tests {
203203

204204
let count = || async {
205205
conn
206-
.query_row(&format!("SELECT COUNT(*) FROM '{table_name}'"), ())
206+
.read_query_row_f(format!("SELECT COUNT(*) FROM '{table_name}'"), (), |row| {
207+
row.get::<_, i64>(0)
208+
})
207209
.await
208210
.unwrap()
209211
.unwrap()
210-
.get::<i64>(0)
211-
.unwrap()
212212
};
213213

214214
assert_eq!(count().await, 2);

trailbase-core/src/admin/rows/list_rows.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,14 @@ pub async fn list_rows_handler(
7272
}
7373
};
7474

75-
let total_row_count = {
75+
let total_row_count: i64 = {
7676
let where_clause = &filter_where_clause.clause;
7777
let count_query = format!("SELECT COUNT(*) FROM '{table_name}' WHERE {where_clause}");
7878
state
7979
.conn()
80-
.query_value::<i64>(&count_query, filter_where_clause.params.clone())
80+
.read_query_row_f(count_query, filter_where_clause.params.clone(), |row| {
81+
row.get(0)
82+
})
8183
.await?
8284
.unwrap_or(-1)
8385
};
@@ -207,12 +209,15 @@ async fn fetch_rows(
207209
"#,
208210
);
209211

210-
let result_rows = conn.query(&query, params).await.map_err(|err| {
211-
#[cfg(debug_assertions)]
212-
error!("QUERY: {query}\n\t=> {err}");
212+
let result_rows = conn
213+
.read_query_rows(query.clone(), params)
214+
.await
215+
.map_err(|err| {
216+
#[cfg(debug_assertions)]
217+
error!("QUERY: {query}\n\t=> {err}");
213218

214-
return err;
215-
})?;
219+
return err;
220+
})?;
216221

217222
return Ok(rows_to_json_arrays(result_rows, 1024)?);
218223
}
@@ -259,7 +264,7 @@ mod tests {
259264
.unwrap();
260265

261266
let cnt: i64 = conn
262-
.query_value("SELECT COUNT(*) FROM test_table", ())
267+
.read_query_row_f("SELECT COUNT(*) FROM test_table", (), |row| row.get(0))
263268
.await
264269
.unwrap()
265270
.unwrap();

trailbase-core/src/admin/table/alter_table.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ mod tests {
174174
);
175175
let _ = create_table_handler(State(state.clone()), Json(create_table_request.clone())).await?;
176176

177-
conn.query(&format!("SELECT {pk_col} FROM foo"), ()).await?;
177+
conn
178+
.read_query_rows(format!("SELECT {pk_col} FROM foo"), ())
179+
.await?;
178180

179181
{
180182
// Noop: source and target identical.
@@ -187,7 +189,9 @@ mod tests {
187189
.await
188190
.unwrap();
189191

190-
conn.query(&format!("SELECT {pk_col} FROM foo"), ()).await?;
192+
conn
193+
.read_query_rows(format!("SELECT {pk_col} FROM foo"), ())
194+
.await?;
191195
}
192196

193197
{
@@ -215,7 +219,7 @@ mod tests {
215219
.unwrap();
216220

217221
conn
218-
.query(&format!("SELECT {pk_col}, new FROM foo"), ())
222+
.read_query_rows(format!("SELECT {pk_col}, new FROM foo"), ())
219223
.await?;
220224
}
221225

@@ -236,8 +240,10 @@ mod tests {
236240
.await
237241
.unwrap();
238242

239-
assert!(conn.query("SELECT * FROM foo", ()).await.is_err());
240-
conn.query(&format!("SELECT {pk_col} FROM bar"), ()).await?;
243+
assert!(conn.read_query_rows("SELECT * FROM foo", ()).await.is_err());
244+
conn
245+
.read_query_rows(format!("SELECT {pk_col} FROM bar"), ())
246+
.await?;
241247
}
242248

243249
return Ok(());

trailbase-core/src/admin/table/list_tables.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ pub async fn list_tables_handler(
4141
// "view"s.
4242
let rows = state
4343
.conn()
44-
.query_values::<SqliteSchema>(
45-
&format!("SELECT type, name, tbl_name, sql FROM {SQLITE_SCHEMA_TABLE} ORDER BY type"),
44+
.read_query_values::<SqliteSchema>(
45+
format!("SELECT type, name, tbl_name, sql FROM {SQLITE_SCHEMA_TABLE} ORDER BY type"),
4646
(),
4747
)
4848
.await?;

trailbase-core/src/admin/user/create_user.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ pub async fn create_user_handler(
6565

6666
let Some(user) = state
6767
.user_conn()
68-
.query_value::<DbUser>(
69-
&INSERT_USER_QUERY,
68+
.write_query_value::<DbUser>(
69+
&*INSERT_USER_QUERY,
7070
named_params! {
7171
":email": normalized_email,
7272
":password_hash": hashed_password,

trailbase-core/src/admin/user/list_users.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,13 @@ pub async fn list_users_handler(
8181
build_filter_where_clause(&table_metadata.schema.columns, filter_params)?;
8282

8383
let total_row_count: i64 = conn
84-
.query_value(
85-
&format!(
84+
.read_query_row_f(
85+
format!(
8686
"SELECT COUNT(*) FROM {USER_TABLE} WHERE {where_clause}",
8787
where_clause = filter_where_clause.clause
8888
),
8989
filter_where_clause.params.clone(),
90+
|row| row.get(0),
9091
)
9192
.await?
9293
.unwrap_or(-1);
@@ -161,6 +162,6 @@ async fn fetch_users(
161162
"#,
162163
);
163164

164-
let users = conn.query_values::<DbUser>(&sql_query, params).await?;
165+
let users = conn.read_query_values::<DbUser>(sql_query, params).await?;
165166
return Ok(users);
166167
}

0 commit comments

Comments
 (0)