Skip to content

Commit

Permalink
SQLite worker queue implementation (#969)
Browse files Browse the repository at this point in the history
* add sqlite worker_queue implementation

* add backticks for clippy

---------

Co-authored-by: Dotan J. Nahum <[email protected]>
  • Loading branch information
isaacdonaldson and jondot authored Nov 7, 2024
1 parent 2335285 commit 9879c2c
Show file tree
Hide file tree
Showing 4 changed files with 541 additions and 25 deletions.
59 changes: 35 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["auth_jwt", "cli", "with-db", "cache_inmem", "bg_redis", "bg_pg"]
default = [
"auth_jwt",
"cli",
"with-db",
"cache_inmem",
"bg_redis",
"bg_pg",
"bg_sqlt",
]
auth_jwt = ["dep:jsonwebtoken"]
cli = ["dep:clap"]
testing = ["dep:axum-test"]
Expand All @@ -37,6 +45,7 @@ storage_gcp = ["object_store/gcp"]
cache_inmem = ["dep:moka"]
bg_redis = ["dep:rusty-sidekiq", "dep:bb8"]
bg_pg = ["dep:sqlx", "dep:ulid"]
bg_sqlt = ["dep:sqlx", "dep:ulid"]

[dependencies]
loco-gen = { version = "0.12.0", path = "./loco-gen" }
Expand All @@ -48,10 +57,10 @@ colored = "2"


sea-orm = { version = "1.1.0", features = [
"sqlx-postgres", # `DATABASE_DRIVER` feature
"sqlx-sqlite",
"runtime-tokio-rustls",
"macros",
"sqlx-postgres", # `DATABASE_DRIVER` feature
"sqlx-sqlite",
"runtime-tokio-rustls",
"macros",
], optional = true }

tokio = { version = "1.33.0", default-features = false }
Expand All @@ -75,10 +84,10 @@ fs-err = "2.11.0"
tera = "1.19.1"
heck = "0.4.0"
lettre = { version = "0.11.4", default-features = false, features = [
"builder",
"hostname",
"smtp-transport",
"tokio1-rustls-tls",
"builder",
"hostname",
"smtp-transport",
"tokio1-rustls-tls",
] }
include_dir = "0.7.3"
thiserror = { workspace = true }
Expand Down Expand Up @@ -125,9 +134,11 @@ moka = { version = "0.12.7", features = ["sync"], optional = true }
tokio-cron-scheduler = { version = "0.11.0", features = ["signal"] }
english-to-cron = { version = "0.1.2" }

# bg_sqlt: sqlite workers
# bg_pg: postgres workers
sqlx = { version = "0.8.2", default-features = false, features = [
"postgres",
"postgres",
"sqlite",
], optional = true }
ulid = { version = "1", optional = true }

Expand All @@ -147,26 +158,26 @@ async-trait = { version = "0.1.74" }
axum = { version = "0.7.5", features = ["macros"] }
tower = "0.4"
tower-http = { version = "0.6.1", features = [
"trace",
"catch-panic",
"timeout",
"add-extension",
"cors",
"fs",
"set-header",
"compression-full",
"trace",
"catch-panic",
"timeout",
"add-extension",
"cors",
"fs",
"set-header",
"compression-full",
] }

[dependencies.sea-orm-migration]
optional = true
version = "1.0.0"
features = [
# Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI.
# View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime.
# e.g.
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
"sqlx-postgres", # `DATABASE_DRIVER` feature
"sqlx-sqlite",
# Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI.
# View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime.
# e.g.
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
"sqlx-postgres", # `DATABASE_DRIVER` feature
"sqlx-sqlite",
]

[package.metadata.docs.rs]
Expand Down
67 changes: 66 additions & 1 deletion src/bgworker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ use tracing::{debug, error};
pub mod pg;
#[cfg(feature = "bg_redis")]
pub mod skq;
#[cfg(feature = "bg_sqlt")]
pub mod sqlt;

use crate::{
app::AppContext,
config::{self, Config, PostgresQueueConfig, QueueConfig, RedisQueueConfig, WorkerMode},
config::{
self, Config, PostgresQueueConfig, QueueConfig, RedisQueueConfig, SqliteQueueConfig,
WorkerMode,
},
Error, Result,
};

Expand All @@ -29,6 +34,12 @@ pub enum Queue {
std::sync::Arc<tokio::sync::Mutex<pg::TaskRegistry>>,
pg::RunOpts,
),
#[cfg(feature = "bg_sqlt")]
Sqlite(
sqlt::SqlitePool,
std::sync::Arc<tokio::sync::Mutex<sqlt::TaskRegistry>>,
sqlt::RunOpts,
),
None,
}

Expand Down Expand Up @@ -62,6 +73,18 @@ impl Queue {
.await
.map_err(Box::from)?;
}
#[cfg(feature = "bg_sqlt")]
Self::Sqlite(pool, _, _) => {
sqlt::enqueue(
pool,
&class,
serde_json::to_value(args)?,
chrono::Utc::now(),
None,
)
.await
.map_err(Box::from)?;
}
_ => {}
}
Ok(())
Expand Down Expand Up @@ -91,6 +114,11 @@ impl Queue {
let mut r = registry.lock().await;
r.register_worker(W::class_name(), worker)?;
}
#[cfg(feature = "bg_sqlt")]
Self::Sqlite(_, registry, _) => {
let mut r = registry.lock().await;
r.register_worker(W::class_name(), worker)?;
}
_ => {}
}
Ok(())
Expand All @@ -116,6 +144,14 @@ impl Queue {
handle.await?;
}
}
#[cfg(feature = "bg_sqlt")]
Self::Sqlite(pool, registry, run_opts) => {
//TODOQ: num workers to config
let handles = registry.lock().await.run(pool, run_opts);
for handle in handles {
handle.await?;
}
}
_ => {
error!(
"no queue provider is configured: compile with at least one queue provider \
Expand All @@ -140,6 +176,10 @@ impl Queue {
Self::Postgres(pool, _, _) => {
pg::initialize_database(pool).await.map_err(Box::from)?;
}
#[cfg(feature = "bg_sqlt")]
Self::Sqlite(pool, _, _) => {
sqlt::initialize_database(pool).await.map_err(Box::from)?;
}
_ => {}
}
Ok(())
Expand All @@ -161,6 +201,10 @@ impl Queue {
Self::Postgres(pool, _, _) => {
pg::clear(pool).await.map_err(Box::from)?;
}
#[cfg(feature = "bg_sqlt")]
Self::Sqlite(pool, _, _) => {
sqlt::clear(pool).await.map_err(Box::from)?;
}
_ => {}
}
Ok(())
Expand All @@ -182,6 +226,10 @@ impl Queue {
Self::Postgres(pool, _, _) => {
pg::ping(pool).await.map_err(Box::from)?;
}
#[cfg(feature = "bg_sqlt")]
Self::Sqlite(pool, _, _) => {
sqlt::ping(pool).await.map_err(Box::from)?;
}
_ => {}
}
Ok(())
Expand All @@ -194,6 +242,8 @@ impl Queue {
Self::Redis(_, _, _) => "redis queue".to_string(),
#[cfg(feature = "bg_pg")]
Self::Postgres(_, _, _) => "postgres queue".to_string(),
#[cfg(feature = "bg_sqlt")]
Self::Sqlite(_, _, _) => "sqlite queue".to_string(),
_ => "no queue".to_string(),
}
}
Expand Down Expand Up @@ -286,6 +336,17 @@ pub async fn converge(queue: &Queue, config: &QueueConfig) -> Result<()> {
num_workers: _,
min_connections: _,
})
| QueueConfig::Sqlite(SqliteQueueConfig {
dangerously_flush,
uri: _,
max_connections: _,
enable_logging: _,
connect_timeout: _,
idle_timeout: _,
poll_interval_sec: _,
num_workers: _,
min_connections: _,
})
| QueueConfig::Redis(RedisQueueConfig {
dangerously_flush,
uri: _,
Expand Down Expand Up @@ -319,6 +380,10 @@ pub async fn create_queue_provider(config: &Config) -> Result<Option<Arc<Queue>>
config::QueueConfig::Postgres(qcfg) => {
Ok(Some(Arc::new(pg::create_provider(qcfg).await?)))
}
#[cfg(feature = "bg_sqlt")]
config::QueueConfig::Sqlite(qcfg) => {
Ok(Some(Arc::new(sqlt::create_provider(qcfg).await?)))
}

#[allow(unreachable_patterns)]
_ => Err(Error::string(
Expand Down
Loading

0 comments on commit 9879c2c

Please sign in to comment.