Skip to content

Commit c06b3ad

Browse files
committed
Close sqlite connections locally rather than on the worker loops.
1 parent 95aeb04 commit c06b3ad

File tree

2 files changed

+29
-66
lines changed

2 files changed

+29
-66
lines changed

trailbase-sqlite/src/connection.rs

Lines changed: 13 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub type Result<T> = std::result::Result<T, Error>;
4848
enum Message {
4949
RunMut(Box<dyn FnOnce(&mut rusqlite::Connection) + Send + 'static>),
5050
RunConst(Box<dyn FnOnce(&rusqlite::Connection) + Send + 'static>),
51-
Close(oneshot::Sender<std::result::Result<(), rusqlite::Error>>),
51+
Terminate,
5252
}
5353

5454
#[derive(Clone)]
@@ -461,48 +461,22 @@ impl Connection {
461461
///
462462
/// Will return `Err` if the underlying SQLite close call fails.
463463
pub async fn close(self) -> Result<()> {
464-
// Returns true if connection was successfully closed.
465-
let closer = async |s: &Sender<Message>| -> std::result::Result<bool, rusqlite::Error> {
466-
let (sender, receiver) = oneshot::channel::<std::result::Result<(), rusqlite::Error>>();
467-
if s.send(Message::Close(sender)).is_err() {
468-
// If the channel is closed on the other side, it means the connection closed successfully
469-
// This is a safeguard against calling close on a `Copy` of the connection
470-
return Ok(false);
471-
}
472-
473-
let Ok(result) = receiver.await else {
474-
// If we get a RecvError at this point, it also means the channel closed in the meantime
475-
// we can assume the connection is closed
476-
return Ok(false);
477-
};
478-
479-
// Return the error from `conn.close()` if any.
480-
result?;
481-
482-
return Ok(true);
483-
};
464+
let _ = self.writer.send(Message::Terminate);
465+
while self.reader.send(Message::Terminate).is_ok() {
466+
// Continue to close readers while the channel is alive.
467+
}
484468

485469
let mut errors = vec![];
486-
if let Err(err) = closer(&self.writer).await {
487-
errors.push(Error::Close(err));
488-
};
489-
490-
loop {
491-
match closer(&self.reader).await {
492-
Ok(closed) => {
493-
if !closed {
494-
break;
495-
}
496-
}
497-
Err(err) => {
498-
errors.push(Error::Close(err));
499-
}
500-
}
470+
let conns: Vec<_> = std::mem::take(&mut self.conns.0.write());
471+
for conn in conns {
472+
if let Err((_, err)) = conn.close() {
473+
errors.push(err);
474+
};
501475
}
502476

503477
if !errors.is_empty() {
504-
warn!("Closing connection: {errors:?}");
505-
return Err(errors.swap_remove(0));
478+
debug!("Closing connection: {errors:?}");
479+
return Err(Error::Close(errors.swap_remove(0)));
506480
}
507481

508482
return Ok(());
@@ -516,8 +490,6 @@ impl Debug for Connection {
516490
}
517491

518492
fn event_loop(id: usize, conns: Arc<LockedConnections>, receiver: Receiver<Message>) {
519-
const BUG_TEXT: &str = "bug in trailbase-sqlite, please report";
520-
521493
while let Ok(message) = receiver.recv() {
522494
match message {
523495
Message::RunConst(f) => {
@@ -528,20 +500,7 @@ fn event_loop(id: usize, conns: Arc<LockedConnections>, receiver: Receiver<Messa
528500
let mut lock = conns.0.write();
529501
f(&mut lock[0])
530502
}
531-
Message::Close(ch) => {
532-
let mut lock = conns.0.write();
533-
let conns: &mut Vec<rusqlite::Connection> = &mut lock;
534-
if conns.is_empty() {
535-
break;
536-
}
537-
538-
let conn = conns.swap_remove(0);
539-
540-
match conn.close() {
541-
Ok(v) => ch.send(Ok(v)).expect(BUG_TEXT),
542-
Err((_conn, e)) => ch.send(Err(e)).expect(BUG_TEXT),
543-
};
544-
503+
Message::Terminate => {
545504
return;
546505
}
547506
};

trailbase-sqlite/src/tests.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -148,18 +148,22 @@ async fn close_failure_test() {
148148
.await
149149
.unwrap();
150150

151-
assert!(match conn.close().await.unwrap_err() {
152-
crate::Error::Close(e) => {
153-
e == rusqlite::Error::SqliteFailure(
154-
ffi::Error {
155-
code: ErrorCode::DatabaseBusy,
156-
extended_code: 5,
157-
},
158-
Some("unable to close due to unfinalized statements or unfinished backups".to_string()),
159-
)
160-
}
161-
_ => false,
162-
});
151+
let err = conn.close().await.unwrap_err();
152+
assert!(
153+
match &err {
154+
crate::Error::Close(e) => {
155+
*e == rusqlite::Error::SqliteFailure(
156+
ffi::Error {
157+
code: ErrorCode::DatabaseBusy,
158+
extended_code: 5,
159+
},
160+
Some("unable to close due to unfinalized statements or unfinished backups".to_string()),
161+
)
162+
}
163+
_ => false,
164+
},
165+
"Error: {err:?}"
166+
);
163167
}
164168

165169
#[tokio::test]

0 commit comments

Comments
 (0)