Skip to content

Commit

Permalink
Merge pull request #37 from 8shaws/engine
Browse files Browse the repository at this point in the history
chore: init engine and docker
  • Loading branch information
shawakash authored Aug 3, 2024
2 parents 8649501 + 5a37bc7 commit 350350f
Show file tree
Hide file tree
Showing 18 changed files with 279 additions and 54 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/engine_build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Deploy xchange_engine

on:
push:
branches:
- main
paths:
- "crates/engine/**"
pull_request:
branches:
- main
paths:
- "crates/engine/**"
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: "mv docker/engine/Dockerfile ."

- name: docker login
env:
DOCKER_USER: ${{secrets.DOCKERHUB_USERNAME}}
DOCKER_PASSWORD: ${{secrets.DOCKERHUB_TOKEN}}
run: |
docker login -u $DOCKER_USER -p $DOCKER_PASSWORD
- name: Build the Docker image
run: docker build . --tag shawakash/xchange_engine:${{ github.sha }}

- name: Docker Push
run: docker push shawakash/xchange_engine:${{ github.sha }}
20 changes: 18 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ diesel = { version = "2.2.0", features = ["postgres", "chrono", "r2d2", "uuid"]
dotenvy = "0.15"
chrono = { version = "0.4.19", features = ["serde"] }
uuid = { version = "1.10.0", features = ["v4", "serde"] }
r2d2_redis = "0.14.0"
2 changes: 2 additions & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub mod redis;
pub mod types;
pub mod utils;
12 changes: 12 additions & 0 deletions crates/common/src/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use r2d2_redis::{r2d2, RedisConnectionManager};
use std::env;

pub type RedisPool = r2d2::Pool<RedisConnectionManager>;

pub fn initialize_redis_pool() -> RedisPool {
let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let manager = RedisConnectionManager::new(redis_url).unwrap();
r2d2::Pool::builder()
.build(manager)
.expect("Failed to create pool")
}
29 changes: 29 additions & 0 deletions crates/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use chrono;
use r2d2_redis::redis;
use std::collections::HashMap;

pub fn current_time() -> String {
chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()
}

pub fn get_queue_with_max_length(conn: &mut redis::Connection, queues: &[&str]) -> Option<String> {
let mut queue_lengths = HashMap::new();

for queue in queues {
let len_result: redis::RedisResult<usize> =
redis::cmd("LLEN").arg(*queue).query(&mut *conn);
match len_result {
Ok(len) => {
queue_lengths.insert(queue.to_string(), len);
}
Err(err) => {
println!("Failed to get length of {}: {}", queue, err);
}
}
}

queue_lengths
.into_iter()
.max_by_key(|&(_, len)| len)
.map(|(queue, _)| queue)
}
32 changes: 32 additions & 0 deletions crates/engine/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Include any files or directories that you don't want to be copied to your
# container here (e.g., local build artifacts, temporary files, etc.).
#
# For more help, visit the .dockerignore file reference guide at
# https://docs.docker.com/engine/reference/builder/#dockerignore-file

**/.DS_Store
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/charts
**/docker-compose*
**/compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/secrets.dev.yaml
**/values.dev.yaml
/bin
/target
LICENSE
README.md
14 changes: 14 additions & 0 deletions crates/engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "engine"
version = "0.1.0"
edition = "2021"

[dependencies]
chrono = "0.4.38"
common = {path = "../common"}
dotenvy = "0.15.7"
r2d2 = "0.8.10"
r2d2_redis = "0.14.0"
serde = "1.0.204"
serde_json = "1.0.122"
tokio = { version = "1.39.2", features = ["full"] }
49 changes: 49 additions & 0 deletions crates/engine/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use dotenvy::dotenv;
use std::sync::Arc;
use tokio;
use tokio::signal;

mod process;

use common::redis::initialize_redis_pool;
use process::handle_process;

#[tokio::main]
async fn main() {
dotenv().ok();

let no_worker_threads = std::env::var("NO_WORKER_THREADS")
.unwrap_or_else(|_| "5".to_string())
.parse::<usize>()
.expect("NO_WORKER_THREADS must be a positive integer");

let queues = vec!["user_email_verify"];

let pool = Arc::new(initialize_redis_pool());

let mut worker_handlers = vec![];

for _ in 0..no_worker_threads {
let pool_clone = Arc::clone(&pool);
let queues_clone = queues.clone();
let worker_handle = tokio::spawn(async move {
handle_process(pool_clone, queues_clone).await;
});
worker_handlers.push(worker_handle);
}

let shutdown_signal = async {
signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
println!(
"{}: Shutdown signal received",
common::utils::current_time()
);
};

shutdown_signal.await;

for handle in worker_handlers {
handle.abort();
}
println!("{}: Shutting down...", common::utils::current_time());
}
40 changes: 40 additions & 0 deletions crates/engine/src/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use dotenvy::dotenv;
use std::sync::Arc;
use std::thread;
use tokio::time::{sleep, Duration};

use common::redis::RedisPool;
use common::utils::current_time;
use common::utils::get_queue_with_max_length;

pub async fn handle_process(conn: Arc<RedisPool>, queues: Vec<&str>) {
dotenv().ok();

let thread_id = format!("{:?}", thread::current().id());
loop {
let con_result = conn.get();

match con_result {
Ok(mut conn) => {
if let Some(max_queue) = get_queue_with_max_length(&mut conn, &queues) {
} else {
println!(
"{}: Worker {}: All queues are empty, waiting...",
current_time(),
thread_id
);
sleep(Duration::from_secs(2)).await;
}
}
Err(err) => {
println!(
"{}: Worker {}: Failed to get Redis connection: {}",
current_time(),
thread_id,
err
);
sleep(Duration::from_secs(2)).await;
}
}
}
}
1 change: 1 addition & 0 deletions crates/notif_worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
common = { path = "../common" }
base64 = "0.22.1"
chrono = "0.4.38"
dotenvy = "0.15.7"
Expand Down
9 changes: 6 additions & 3 deletions crates/notif_worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ mod process;
mod types;
mod utils;

use common::redis::initialize_redis_pool;
use process::handle_process;
use utils::initialize_redis_pool;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -36,13 +36,16 @@ async fn main() {

let shutdown_signal = async {
signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
println!("{}: Shutdown signal received", utils::current_time());
println!(
"{}: Shutdown signal received",
common::utils::current_time()
);
};

shutdown_signal.await;

for handle in worker_handlers {
handle.abort();
}
println!("{}: Shutting down...", utils::current_time());
println!("{}: Shutting down...", common::utils::current_time());
}
6 changes: 3 additions & 3 deletions crates/notif_worker/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use tokio::time::{sleep, Duration};

mod processes;

use crate::utils::current_time;
use crate::utils::get_queue_with_max_length;
use crate::utils::RedisPool;
use common::redis::RedisPool;
use common::utils::current_time;
use common::utils::get_queue_with_max_length;

pub async fn handle_process(conn: Arc<RedisPool>, queues: Vec<&str>) {
dotenv().ok();
Expand Down
2 changes: 1 addition & 1 deletion crates/notif_worker/src/process/processes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use serde_json::Value;
use std::thread;
use tokio::time::{sleep, Duration};

use crate::utils::current_time;
use crate::utils::lib::generate_otp;
use crate::utils::lib::send_mail;
use common::utils::current_time;

pub async fn send_email_process(conn: &mut redis::Connection) {
let thread_id = format!("{:?}", thread::current().id());
Expand Down
2 changes: 1 addition & 1 deletion crates/notif_worker/src/utils/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use totp_rs::{Algorithm, TOTP};

use crate::constants::get_email_html;

use super::current_time;
use common::utils::current_time;

pub async fn send_mail(mail: &str, otp: &str, thread_id: &str) {
let smtp_username = env::var("SMTP_USERNAME").expect("SMTP_USERNAME must be set");
Expand Down
42 changes: 0 additions & 42 deletions crates/notif_worker/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1 @@
use chrono;
use r2d2_redis::redis;
use r2d2_redis::{r2d2, RedisConnectionManager};
use std::collections::HashMap;
use std::env;

pub mod lib;

pub type RedisPool = r2d2::Pool<RedisConnectionManager>;

pub fn initialize_redis_pool() -> RedisPool {
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".to_string());
let manager = RedisConnectionManager::new(redis_url).unwrap();
r2d2::Pool::builder()
.build(manager)
.expect("Failed to create pool")
}

pub fn current_time() -> String {
chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()
}

pub fn get_queue_with_max_length(conn: &mut redis::Connection, queues: &[&str]) -> Option<String> {
let mut queue_lengths = HashMap::new();

for queue in queues {
let len_result: redis::RedisResult<usize> =
redis::cmd("LLEN").arg(*queue).query(&mut *conn);
match len_result {
Ok(len) => {
queue_lengths.insert(queue.to_string(), len);
}
Err(err) => {
println!("Failed to get length of {}: {}", queue, err);
}
}
}

queue_lengths
.into_iter()
.max_by_key(|&(_, len)| len)
.map(|(queue, _)| queue)
}
Loading

0 comments on commit 350350f

Please sign in to comment.