diff --git a/crates/api/src/redis.rs b/crates/api/src/redis.rs index 95352d5..0a8b045 100644 --- a/crates/api/src/redis.rs +++ b/crates/api/src/redis.rs @@ -1,5 +1,13 @@ -use r2d2_redis::{r2d2, RedisConnectionManager}; +use common::types::order::{NewOrder, OrderResponse}; +use futures::future::FutureExt; +use r2d2_redis::{ + r2d2, + redis::{Commands, ErrorKind, RedisError}, + RedisConnectionManager, +}; +use serde_json::{json, Value}; use std::env; +use tokio::sync::oneshot; pub type RedisPool = r2d2::Pool; @@ -10,3 +18,50 @@ pub fn initialize_redis_pool() -> RedisPool { .build(manager) .expect("Failed to create pool") } + +pub struct RedisClient; + +impl RedisClient { + pub fn place_order_and_wait( + pool: &RedisPool, + order: NewOrder, + ) -> impl futures::Future> { + let (tx, rx) = oneshot::channel(); + let client_order_id = uuid::Uuid::new_v4().to_string(); + let mut order = order; + order.client_order_id = Some(client_order_id.clone()); + let order_json = serde_json::to_string(&order).unwrap(); + + let pool_clone = pool.clone(); + + tokio::spawn(async move { + let mut conn = pool_clone.get().expect("Failed to get connection"); + let mut pubsub = conn.as_pubsub(); + pubsub.subscribe(&client_order_id).unwrap(); + + let mut conn_clone = pool_clone.get().expect("Failed to get connection"); + let _: () = conn_clone + .lpush(format!("orders@{}", order.symbol), order_json) + .expect("Failed to push order to queue"); + + match pubsub.get_message() { + Ok(msg) => { + let payload: String = msg.get_payload().unwrap(); + let payload_json: OrderResponse = serde_json::from_str(&payload).unwrap(); + pubsub.unsubscribe(&client_order_id).unwrap(); + tx.send(payload_json).unwrap(); + } + Err(e) => { + eprintln!("Error receiving message: {:?}", e); + } + } + }); + + rx.map(|result| { + result.map_err(|e| { + RedisError::from((ErrorKind::IoError, "Oneshot channel error", e.to_string())) + }) + }) + .boxed() + } +} diff --git a/crates/api/src/route/order.rs b/crates/api/src/route/order.rs index ec8937e..1227df2 100644 --- a/crates/api/src/route/order.rs +++ b/crates/api/src/route/order.rs @@ -1,20 +1,52 @@ -use actix_web::{web, HttpResponse, Responder}; +use actix_web::{web, HttpMessage, HttpRequest, HttpResponse, Responder, Result}; use serde_json::json; -use crate::{middlewares::extract_client_id::ExtractClientId, types::AppState}; +use crate::{ + middlewares::{ + extract_client_id::{ExtractClientId, IdKey}, + verify_user::VerifyUser, + }, + redis::RedisClient, + types::AppState, +}; use common::types::order::NewOrder; async fn create_new_order( app_state: web::Data, order: web::Json, -) -> impl Responder { - HttpResponse::Ok().json(json!("Create order")) + req: HttpRequest, +) -> Result { + let id_key = req.extensions().get::().cloned(); + match id_key { + Some(id) => { + let order = order.into_inner(); + let pool = app_state.redis_pool.clone(); + let order_res = RedisClient::place_order_and_wait(&pool, order).await; + match order_res { + Ok(res) => Ok(HttpResponse::Ok().json(json!({ + "status": "ok", + "message": "Order created successfully", + "order": res + }))), + Err(_) => Ok(HttpResponse::InternalServerError().json(json!({ + "status": "error", + "message": "Error creating order" + }))), + } + } + None => Ok(HttpResponse::BadRequest().json(json!({ + "status": "error", + "message": "Invalid token" + }))), + } } pub fn order_config(cfg: &mut web::ServiceConfig, app_state: web::Data) { + let verify_user_middle = VerifyUser::new(app_state.clone()); cfg.service( web::scope("/orders") + .wrap(verify_user_middle) .wrap(ExtractClientId) .service(web::resource("/").route(web::post().to(create_new_order))), ); diff --git a/crates/common/src/types/order.rs b/crates/common/src/types/order.rs index c5829f9..e938d92 100644 --- a/crates/common/src/types/order.rs +++ b/crates/common/src/types/order.rs @@ -38,6 +38,7 @@ pub enum OrderStatus { pub struct NewOrder { pub symbol: String, pub side: OrderSide, + #[serde(rename = "type")] pub type_: OrderType, pub time_in_force: TimeInForce, pub quantity: f64, @@ -45,4 +46,29 @@ pub struct NewOrder { pub price: f64, pub stop_price: f64, pub timestamp: i64, + pub client_order_id: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct OrderResponse { + pub symbol: String, + pub order_id: u64, + pub client_order_id: String, + pub transact_time: u64, + pub price: f64, + pub orig_qty: f64, + pub executed_ty: f64, + pub status: OrderStatus, + pub time_in_force: TimeInForce, + #[serde(rename = "type")] + pub type_: OrderType, + pub side: OrderSide, + pub fills: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Fill { + pub price: f64, + pub qty: f64, + pub commission: f64, }