Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: adds method to push the new order to queue and wait for response from engine #36

Merged
merged 4 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion crates/api/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
use r2d2_redis::{r2d2, RedisConnectionManager};
use common::types::order::{NewOrder, OrderResponse};
use futures::future::FutureExt;
use r2d2_redis::{
r2d2,
redis::{Commands, ErrorKind, RedisError},
RedisConnectionManager,
};
use serde_json::{json, Value};
use std::env;
use tokio::sync::oneshot;

pub type RedisPool = r2d2::Pool<RedisConnectionManager>;

Expand All @@ -10,3 +18,50 @@ pub fn initialize_redis_pool() -> RedisPool {
.build(manager)
.expect("Failed to create pool")
}

pub struct RedisClient;

impl RedisClient {
pub fn place_order_and_wait(
pool: &RedisPool,
order: NewOrder,
) -> impl futures::Future<Output = Result<OrderResponse, RedisError>> {
let (tx, rx) = oneshot::channel();
let client_order_id = uuid::Uuid::new_v4().to_string();
let mut order = order;
order.client_order_id = Some(client_order_id.clone());
let order_json = serde_json::to_string(&order).unwrap();

let pool_clone = pool.clone();

tokio::spawn(async move {
let mut conn = pool_clone.get().expect("Failed to get connection");
let mut pubsub = conn.as_pubsub();
pubsub.subscribe(&client_order_id).unwrap();

let mut conn_clone = pool_clone.get().expect("Failed to get connection");
let _: () = conn_clone
.lpush(format!("orders@{}", order.symbol), order_json)
.expect("Failed to push order to queue");

match pubsub.get_message() {
Ok(msg) => {
let payload: String = msg.get_payload().unwrap();
let payload_json: OrderResponse = serde_json::from_str(&payload).unwrap();
pubsub.unsubscribe(&client_order_id).unwrap();
tx.send(payload_json).unwrap();
}
Err(e) => {
eprintln!("Error receiving message: {:?}", e);
}
}
});

rx.map(|result| {
result.map_err(|e| {
RedisError::from((ErrorKind::IoError, "Oneshot channel error", e.to_string()))
})
})
.boxed()
}
}
40 changes: 36 additions & 4 deletions crates/api/src/route/order.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
use actix_web::{web, HttpResponse, Responder};
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse, Responder, Result};
use serde_json::json;

use crate::{middlewares::extract_client_id::ExtractClientId, types::AppState};
use crate::{
middlewares::{
extract_client_id::{ExtractClientId, IdKey},
verify_user::VerifyUser,
},
redis::RedisClient,
types::AppState,
};

use common::types::order::NewOrder;

async fn create_new_order(
app_state: web::Data<AppState>,
order: web::Json<NewOrder>,
) -> impl Responder {
HttpResponse::Ok().json(json!("Create order"))
req: HttpRequest,
) -> Result<impl Responder> {
let id_key = req.extensions().get::<IdKey>().cloned();
match id_key {
Some(id) => {
let order = order.into_inner();
let pool = app_state.redis_pool.clone();
let order_res = RedisClient::place_order_and_wait(&pool, order).await;
match order_res {
Ok(res) => Ok(HttpResponse::Ok().json(json!({
"status": "ok",
"message": "Order created successfully",
"order": res
}))),
Err(_) => Ok(HttpResponse::InternalServerError().json(json!({
"status": "error",
"message": "Error creating order"
}))),
}
}
None => Ok(HttpResponse::BadRequest().json(json!({
"status": "error",
"message": "Invalid token"
}))),
}
}

pub fn order_config(cfg: &mut web::ServiceConfig, app_state: web::Data<AppState>) {
let verify_user_middle = VerifyUser::new(app_state.clone());
cfg.service(
web::scope("/orders")
.wrap(verify_user_middle)
.wrap(ExtractClientId)
.service(web::resource("/").route(web::post().to(create_new_order))),
);
Expand Down
26 changes: 26 additions & 0 deletions crates/common/src/types/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,37 @@ pub enum OrderStatus {
pub struct NewOrder {
pub symbol: String,
pub side: OrderSide,
#[serde(rename = "type")]
pub type_: OrderType,
pub time_in_force: TimeInForce,
pub quantity: f64,
pub quote_order_qty: f64,
pub price: f64,
pub stop_price: f64,
pub timestamp: i64,
pub client_order_id: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct OrderResponse {
pub symbol: String,
pub order_id: u64,
pub client_order_id: String,
pub transact_time: u64,
pub price: f64,
pub orig_qty: f64,
pub executed_ty: f64,
pub status: OrderStatus,
pub time_in_force: TimeInForce,
#[serde(rename = "type")]
pub type_: OrderType,
pub side: OrderSide,
pub fills: Vec<Fill>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Fill {
pub price: f64,
pub qty: f64,
pub commission: f64,
}
Loading