Skip to content

Commit

Permalink
add logs
Browse files Browse the repository at this point in the history
  • Loading branch information
NorbertBodziony committed Jul 31, 2023
1 parent 78efa54 commit 1eff027
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ garde = "0.11.2"
axum_garde = "0.11.2"
uuid7 = { version = "0.3.3" }
reqwest = "0.11.18"
log = "0.4.19"
5 changes: 4 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ reqwest = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
dotenvy = { workspace = true }
log = { workspace = true }
axum = { version = "0.6.12", features = ["ws"] }
axum-extra = { version = "0.7.2" }
axum-macros = "0.3.7"
Expand All @@ -37,4 +38,6 @@ axum-server = { version = "0.5", features = ["tls-rustls"] }
once_cell = "1.18.0"
shellexpand = "3.1.0"
cors = "0.1.0"
tower-http = { version = "0.3.4", features = ["cors"] }
tower-http = { version = "0.4.3", features = ["cors", "trace"] }
tracing-subscriber = { version = "0.3" }
tracing = "0.1.37"
1 change: 0 additions & 1 deletion server/src/app/app_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ pub async fn app_handler(
Some(msg) => match msg {
Ok(msg) => msg,
Err(_e) => {
println!("App disconnected");
let app_disconnected_event =
ServerToClient::AppDisconnectedEvent(AppDisconnectedEvent {
session_id: session_id.clone(),
Expand Down
25 changes: 24 additions & 1 deletion server/src/client/client_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use axum::{
response::Response,
};
use futures::StreamExt;
use log::info;

use crate::{
errors::NightlyError,
Expand Down Expand Up @@ -54,7 +55,7 @@ pub async fn client_handler(
let (sender, mut receiver) = socket.split();
// Handle the new app connection here
// Wait for initialize message
let client_id = loop {
let client_id: String = loop {
let msg = match receiver.next().await {
Some(msg) => match msg {
Ok(msg) => msg,
Expand Down Expand Up @@ -101,6 +102,7 @@ pub async fn client_handler(
}
}
};
info!("Client connected: {}", client_id);
// Main loop request handler
loop {
let sessions = sessions.clone();
Expand Down Expand Up @@ -185,6 +187,8 @@ pub async fn client_handler(
continue;
}
};
info!("Client {} new msg {:?}", client_id, app_msg);

match app_msg {
ClientToServer::ConnectRequest(connect_request) => {
let mut sessions = sessions.write().await;
Expand All @@ -199,6 +203,12 @@ pub async fn client_handler(
.send_to_client(client_id.clone(), error)
.await
.unwrap_or_default();

info!(
"Client {} session does not exist {}",
client_id, connect_request.session_id
);

continue;
}
};
Expand Down Expand Up @@ -250,6 +260,10 @@ pub async fn client_handler(
.send_to_client(client_id.clone(), error)
.await
.unwrap_or_default();
info!(
"Client {} session does not exist {}",
client_id, new_payload_event_reply.session_id
);
continue;
}
};
Expand Down Expand Up @@ -298,6 +312,10 @@ pub async fn client_handler(
.send_to_client(client_id.clone(), error)
.await
.unwrap_or_default();
info!(
"Client {} session does not exist {}",
client_id, get_info_request.session_id
);
continue;
}
};
Expand Down Expand Up @@ -325,6 +343,10 @@ pub async fn client_handler(
.send_to_client(client_id.clone(), error)
.await
.unwrap_or_default();
info!(
"Client {} session does not exist {}",
client_id, get_pending_requests_request.session_id
);
continue;
}
};
Expand Down Expand Up @@ -386,5 +408,6 @@ pub async fn client_handler(
.unwrap_or_default();
}
}
info!("Client {} msg handled", client_id);
}
}
2 changes: 2 additions & 0 deletions server/src/handle_error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use axum::response::IntoResponse;
use hyper::StatusCode;
use log::error;
use tower::BoxError;

use crate::errors::NightlyError;

pub async fn handle_error(error: BoxError) -> impl IntoResponse {
error!("Request error {:?}", error);
if error.is::<tower::timeout::error::Elapsed>() {
return (
StatusCode::REQUEST_TIMEOUT,
Expand Down
13 changes: 7 additions & 6 deletions server/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
structs::http_endpoints::HttpEndpoint,
utils::get_cors,
};

use tower_http::trace::TraceLayer;
pub async fn get_router() -> Router {
let state = ServerState {
sessions: Default::default(),
Expand All @@ -32,6 +32,9 @@ pub async fn get_router() -> Router {
// Start cleaning outdated sessions
start_cleaning_sessions(state.sessions.clone(), state.client_to_sessions.clone());
let cors = get_cors();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();

return Router::new()
.route("/client", get(on_new_client_connection))
Expand Down Expand Up @@ -62,14 +65,12 @@ pub async fn get_router() -> Router {
&HttpEndpoint::GetWalletsMetadata.to_string(),
get(get_wallets_metadata),
)
.with_state(state)
.layer(TraceLayer::new_for_http())
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.load_shed()
.concurrency_limit(1024)
.timeout(Duration::from_secs(10))
.into_inner(),
.timeout(Duration::from_secs(10)),
)
.with_state(state)
.layer(cors);
}
14 changes: 9 additions & 5 deletions server/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use axum::extract::ws::{Message, WebSocket};
use axum_macros::FromRef;
use dashmap::{DashMap, DashSet};
use futures::{stream::SplitSink, SinkExt};
use log::info;
use tokio::sync::RwLock;

pub type SessionId = String;
Expand Down Expand Up @@ -50,11 +51,14 @@ pub trait SendToClient {
impl SendToClient for ClientSockets {
async fn send_to_client(&self, client_id: ClientId, msg: ServerToClient) -> Result<()> {
match &mut self.get_mut(&client_id) {
Some(client_socket) => Ok(client_socket
.send(Message::Text(
serde_json::to_string(&msg).expect("Serialization should work"),
))
.await?),
Some(client_socket) => {
info!("Send to client {}, msg: {:?}", client_id, msg);
return Ok(client_socket
.send(Message::Text(
serde_json::to_string(&msg).expect("Serialization should work"),
))
.await?);
}
None => Err(anyhow::anyhow!("No client socket found for session")),
}
}
Expand Down
14 changes: 9 additions & 5 deletions server/src/structs/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::{
use anyhow::Result;
use axum::extract::ws::{Message, WebSocket};
use futures::{stream::SplitSink, SinkExt};
use log::info;

#[derive(Debug)]
pub struct Session {
Expand All @@ -26,11 +27,14 @@ pub struct Session {
impl Session {
pub async fn send_to_app(&mut self, msg: ServerToApp) -> Result<()> {
match &mut self.app_state.app_socket {
Some(app_socket) => Ok(app_socket
.send(Message::Text(
serde_json::to_string(&msg).expect("Serialization should work"),
))
.await?),
Some(app_socket) => {
info!("Send to app {}, msg: {:?}", self.session_id, msg);
return Ok(app_socket
.send(Message::Text(
serde_json::to_string(&msg).expect("Serialization should work"),
))
.await?);
}
None => Err(anyhow::anyhow!("No app socket found for session")),
}
}
Expand Down

0 comments on commit 1eff027

Please sign in to comment.