Skip to content

Commit

Permalink
Merge pull request #36 from 8shaws/orders
Browse files Browse the repository at this point in the history
chore: adds method to push the new order to queue and wait for response from engine
  • Loading branch information
shawakash authored Aug 2, 2024
2 parents 3857b35 + 1879f5f commit 8649501
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 5 deletions.
57 changes: 56 additions & 1 deletion crates/api/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
use r2d2_redis::{r2d2, RedisConnectionManager};
use common::types::order::{NewOrder, OrderResponse};
use futures::future::FutureExt;
use r2d2_redis::{
r2d2,
redis::{Commands, ErrorKind, RedisError},
RedisConnectionManager,
};
use serde_json::{json, Value};
use std::env;
use tokio::sync::oneshot;

pub type RedisPool = r2d2::Pool<RedisConnectionManager>;

Expand All @@ -10,3 +18,50 @@ pub fn initialize_redis_pool() -> RedisPool {
.build(manager)
.expect("Failed to create pool")
}

pub struct RedisClient;

impl RedisClient {
pub fn place_order_and_wait(
pool: &RedisPool,
order: NewOrder,
) -> impl futures::Future<Output = Result<OrderResponse, RedisError>> {
let (tx, rx) = oneshot::channel();
let client_order_id = uuid::Uuid::new_v4().to_string();
let mut order = order;
order.client_order_id = Some(client_order_id.clone());
let order_json = serde_json::to_string(&order).unwrap();

let pool_clone = pool.clone();

tokio::spawn(async move {
let mut conn = pool_clone.get().expect("Failed to get connection");
let mut pubsub = conn.as_pubsub();
pubsub.subscribe(&client_order_id).unwrap();

let mut conn_clone = pool_clone.get().expect("Failed to get connection");
let _: () = conn_clone
.lpush(format!("orders@{}", order.symbol), order_json)
.expect("Failed to push order to queue");

match pubsub.get_message() {
Ok(msg) => {
let payload: String = msg.get_payload().unwrap();
let payload_json: OrderResponse = serde_json::from_str(&payload).unwrap();
pubsub.unsubscribe(&client_order_id).unwrap();
tx.send(payload_json).unwrap();
}
Err(e) => {
eprintln!("Error receiving message: {:?}", e);
}
}
});

rx.map(|result| {
result.map_err(|e| {
RedisError::from((ErrorKind::IoError, "Oneshot channel error", e.to_string()))
})
})
.boxed()
}
}
40 changes: 36 additions & 4 deletions crates/api/src/route/order.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
use actix_web::{web, HttpResponse, Responder};
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse, Responder, Result};
use serde_json::json;

use crate::{middlewares::extract_client_id::ExtractClientId, types::AppState};
use crate::{
middlewares::{
extract_client_id::{ExtractClientId, IdKey},
verify_user::VerifyUser,
},
redis::RedisClient,
types::AppState,
};

use common::types::order::NewOrder;

async fn create_new_order(
app_state: web::Data<AppState>,
order: web::Json<NewOrder>,
) -> impl Responder {
HttpResponse::Ok().json(json!("Create order"))
req: HttpRequest,
) -> Result<impl Responder> {
let id_key = req.extensions().get::<IdKey>().cloned();
match id_key {
Some(id) => {
let order = order.into_inner();
let pool = app_state.redis_pool.clone();
let order_res = RedisClient::place_order_and_wait(&pool, order).await;
match order_res {
Ok(res) => Ok(HttpResponse::Ok().json(json!({
"status": "ok",
"message": "Order created successfully",
"order": res
}))),
Err(_) => Ok(HttpResponse::InternalServerError().json(json!({
"status": "error",
"message": "Error creating order"
}))),
}
}
None => Ok(HttpResponse::BadRequest().json(json!({
"status": "error",
"message": "Invalid token"
}))),
}
}

pub fn order_config(cfg: &mut web::ServiceConfig, app_state: web::Data<AppState>) {
let verify_user_middle = VerifyUser::new(app_state.clone());
cfg.service(
web::scope("/orders")
.wrap(verify_user_middle)
.wrap(ExtractClientId)
.service(web::resource("/").route(web::post().to(create_new_order))),
);
Expand Down
26 changes: 26 additions & 0 deletions crates/common/src/types/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,37 @@ pub enum OrderStatus {
pub struct NewOrder {
pub symbol: String,
pub side: OrderSide,
#[serde(rename = "type")]
pub type_: OrderType,
pub time_in_force: TimeInForce,
pub quantity: f64,
pub quote_order_qty: f64,
pub price: f64,
pub stop_price: f64,
pub timestamp: i64,
pub client_order_id: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct OrderResponse {
pub symbol: String,
pub order_id: u64,
pub client_order_id: String,
pub transact_time: u64,
pub price: f64,
pub orig_qty: f64,
pub executed_ty: f64,
pub status: OrderStatus,
pub time_in_force: TimeInForce,
#[serde(rename = "type")]
pub type_: OrderType,
pub side: OrderSide,
pub fills: Vec<Fill>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Fill {
pub price: f64,
pub qty: f64,
pub commission: f64,
}

0 comments on commit 8649501

Please sign in to comment.