Skip to content

Commit 4487d30

Browse files
committed
Put early queue work and apalis deps behind a "queue" feature.
1 parent 2908e9f commit 4487d30

File tree

6 files changed

+115
-19
lines changed

6 files changed

+115
-19
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ members = [
1515
]
1616
default-members = [
1717
"client/trailbase-rs",
18-
"trailbase-apalis",
1918
"trailbase-assets",
2019
"trailbase-cli",
2120
"trailbase-core",

trailbase-core/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ harness = false
2121

2222
[features]
2323
default = ["v8"]
24-
v8 = ["dep:rustyscript", ]
24+
v8 = ["dep:rustyscript"]
25+
queue = ["dep:apalis", "dep:trailbase-apalis"]
2526

2627
[dependencies]
28+
apalis = { version = "0.7.0", optional = true, default-features = false }
2729
arc-swap = "1.7.1"
2830
argon2 = { version = "^0.5.3", default-features = false, features = ["alloc", "password-hash"] }
2931
askama = { workspace = true }
@@ -78,7 +80,7 @@ tower-http = { version = "^0.6.0", default-features = false, features = ["cors",
7880
tower-service = { version = "0.3.3", default-features = false }
7981
tracing = { version = "0.1.40", default-features = false }
8082
tracing-subscriber = { version = "0.3.18", default-features = false, features = ["smallvec", "std", "fmt", "json"] }
81-
trailbase-apalis = { workspace = true }
83+
trailbase-apalis = { workspace = true, optional = true }
8284
trailbase-assets = { workspace = true }
8385
trailbase-extension = { workspace = true }
8486
trailbase-refinery-core = { workspace = true }

trailbase-core/src/app_state.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::config::{validate_config, write_config_and_vault_textproto};
1010
use crate::data_dir::DataDir;
1111
use crate::email::Mailer;
1212
use crate::js::RuntimeHandle;
13-
use crate::queue::QueueStorage;
13+
use crate::queue::Queue;
1414
use crate::records::RecordApi;
1515
use crate::records::subscribe::SubscriptionManager;
1616
use crate::scheduler::{JobRegistry, build_job_registry_from_config};
@@ -34,7 +34,7 @@ struct InternalState {
3434

3535
conn: trailbase_sqlite::Connection,
3636
logs_conn: trailbase_sqlite::Connection,
37-
queue: QueueStorage,
37+
queue: Queue,
3838

3939
jwt: JwtHelper,
4040

@@ -59,7 +59,7 @@ pub(crate) struct AppStateArgs {
5959
pub config: Config,
6060
pub conn: trailbase_sqlite::Connection,
6161
pub logs_conn: trailbase_sqlite::Connection,
62-
pub queue: QueueStorage,
62+
pub queue: Queue,
6363
pub jwt: JwtHelper,
6464
pub object_store: Box<dyn ObjectStore + Send + Sync>,
6565
pub js_runtime_threads: Option<usize>,
@@ -186,7 +186,7 @@ impl AppState {
186186
return &self.state.logs_conn;
187187
}
188188

189-
pub fn queue(&self) -> &QueueStorage {
189+
pub fn queue(&self) -> &Queue {
190190
return &self.state.queue;
191191
}
192192

@@ -441,7 +441,7 @@ pub async fn test_state(options: Option<TestStateOptions>) -> anyhow::Result<App
441441
config,
442442
conn: conn.clone(),
443443
logs_conn,
444-
queue: crate::queue::init_queue_storage(None).await.unwrap(),
444+
queue: Queue::new(None).await.unwrap(),
445445
jwt: jwt::test_jwt_helper(),
446446
table_metadata: table_metadata.clone(),
447447
subscription_manager: SubscriptionManager::new(conn.clone(), table_metadata, record_apis),

trailbase-core/src/queue.rs

Lines changed: 104 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use serde::{Deserialize, Serialize};
21
use thiserror::Error;
3-
use trailbase_apalis::sqlite::SqliteStorage;
42

53
use crate::data_dir::DataDir;
64

@@ -10,18 +8,71 @@ pub enum QueueError {
108
SqliteExtension(#[from] trailbase_extension::Error),
119
#[error("SQLite error: {0}")]
1210
Sqlite(#[from] trailbase_sqlite::Error),
11+
#[error("IO error: {0}")]
12+
IO(#[from] std::io::Error),
1313
}
1414

15-
#[derive(Debug, Serialize, Deserialize)]
16-
pub enum Job {
17-
SendEmail(),
15+
#[cfg(feature = "queue")]
16+
pub(crate) mod queue_impl {
17+
use log::*;
18+
use serde::{Deserialize, Serialize};
19+
20+
use super::QueueError;
21+
22+
#[derive(Debug, Serialize, Deserialize)]
23+
pub enum Job {
24+
Something(),
25+
}
26+
27+
pub async fn handle_job(job: Job) -> Result<(), QueueError> {
28+
match job {
29+
Job::Something() => {
30+
info!("Queue got something");
31+
}
32+
}
33+
34+
return Ok(());
35+
}
36+
37+
#[cfg(feature = "queue")]
38+
pub(crate) type QueueStorage = trailbase_apalis::sqlite::SqliteStorage<Job>;
1839
}
1940

20-
pub type QueueStorage = SqliteStorage<Job>;
41+
#[derive(Clone)]
42+
pub struct Queue {
43+
#[cfg(feature = "queue")]
44+
pub(crate) storage: queue_impl::QueueStorage,
45+
}
46+
47+
impl Queue {
48+
#[allow(unused)]
49+
pub(crate) async fn new(data_dir: Option<&DataDir>) -> Result<Self, QueueError> {
50+
return Ok(Self {
51+
#[cfg(feature = "queue")]
52+
storage: init_queue_storage(data_dir).await?,
53+
});
54+
}
55+
56+
#[cfg(feature = "queue")]
57+
#[allow(unused)]
58+
pub(crate) async fn run(&self) -> Result<(), QueueError> {
59+
use apalis::prelude::*;
60+
61+
let monitor = Monitor::new().register({
62+
WorkerBuilder::new("default-worker")
63+
// .enable_tracing()
64+
.backend(self.storage.clone())
65+
.build_fn(queue_impl::handle_job)
66+
});
67+
68+
return Ok(monitor.run().await?);
69+
}
70+
}
2171

72+
#[cfg(feature = "queue")]
2273
pub(crate) async fn init_queue_storage(
2374
data_dir: Option<&DataDir>,
24-
) -> Result<QueueStorage, QueueError> {
75+
) -> Result<queue_impl::QueueStorage, QueueError> {
2576
let queue_path = data_dir.map(|d| d.queue_db_path());
2677
let conn = trailbase_sqlite::Connection::new(
2778
|| -> Result<_, trailbase_sqlite::Error> {
@@ -34,8 +85,51 @@ pub(crate) async fn init_queue_storage(
3485
None,
3586
)?;
3687

37-
SqliteStorage::setup(&conn).await?;
88+
trailbase_apalis::sqlite::SqliteStorage::setup(&conn).await?;
89+
90+
let config = trailbase_apalis::Config::new("ns::trailbase");
91+
return Ok(queue_impl::QueueStorage::new_with_config(conn, config));
92+
}
93+
94+
#[cfg(test)]
95+
#[cfg(feature = "queue")]
96+
mod tests {
97+
use super::queue_impl::*;
98+
use super::*;
99+
100+
use apalis::prelude::*;
101+
102+
#[tokio::test]
103+
async fn test_queue() {
104+
let mut queue = Queue::new(None).await.unwrap();
105+
106+
let (sender, receiver) = async_channel::unbounded::<()>();
107+
108+
let storage = queue.storage.clone();
109+
let _ = tokio::spawn(async move {
110+
let monitor = Monitor::new().register({
111+
WorkerBuilder::new("default-worker")
112+
.data(sender)
113+
.backend(storage)
114+
.build_fn(
115+
async |job: Job, sender: Data<async_channel::Sender<()>>| -> Result<(), QueueError> {
116+
match job {
117+
Job::Something() => sender.send(()).await.unwrap(),
118+
}
119+
120+
return Ok(());
121+
},
122+
)
123+
});
124+
125+
return monitor.run().await;
126+
});
127+
128+
let job = queue.storage.push(Job::Something()).await.unwrap();
129+
130+
let entry = queue.storage.fetch_by_id(&job.task_id).await.unwrap();
131+
assert!(entry.is_some());
38132

39-
let config = trailbase_apalis::Config::new("apalis::test");
40-
return Ok(SqliteStorage::new_with_config(conn, config));
133+
receiver.recv().await.unwrap();
134+
}
41135
}

trailbase-core/src/server/init.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub async fn init_app_state(
5858
let logs_conn = crate::connection::init_logs_db(Some(&data_dir))?;
5959

6060
// TODO: At this early stage we're using an in-memory db. Go persistent before rolling out.
61-
let queue = crate::queue::init_queue_storage(None).await?;
61+
let queue = crate::queue::Queue::new(None).await?;
6262

6363
// Open or init the main db. Note that we derive whether a new DB was initialized based on
6464
// whether the V1 migration had to be applied. Should be fairly robust.

0 commit comments

Comments
 (0)