diff --git a/crates/kvs-client/src/client.rs b/crates/kvs-client/src/client.rs index 52220ca..24bd03b 100644 --- a/crates/kvs-client/src/client.rs +++ b/crates/kvs-client/src/client.rs @@ -1,6 +1,6 @@ use anyhow::Context; use common::message::Payload; -use common::{Get, Message, Rm, Set}; +use common::{Get, Message, Rm, Set, MessageType}; use kvs::cli::{Action, GetCmd, RmCmd, SetCmd}; use kvs::exit_program; use log::trace; @@ -36,11 +36,11 @@ fn main() -> anyhow::Result<()> { } fn send(payload: Payload, server: &mut TcpStream) -> anyhow::Result<()> { - let mut message_bytes = vec![0_u8; 1024]; + let mut message_bytes : Vec = vec![]; let r#type = match payload { - Payload::Set { .. } => 0_i32, - Payload::Get { .. } => 1_i32, - Payload::Rm { .. } => 2_i32, + Payload::Set { .. } => MessageType::Set as i32, // 0 + Payload::Get { .. } => MessageType::Get as i32, // 1 + Payload::Rm { .. } => MessageType::Rm as i32, // 2 }; let message = Message { r#type, @@ -56,6 +56,8 @@ fn send(payload: Payload, server: &mut TcpStream) -> anyhow::Result<()> { server.flush()?; log::debug!("Written {} bytes to server stream", message_bytes.len()); log::trace!("Bytes -> {message_bytes:?}"); + server.shutdown(std::net::Shutdown::Write)?; + log::trace!("Shut down server stream write"); } // Clear buffer, await response message_bytes.clear(); diff --git a/crates/kvs-server/src/request.rs b/crates/kvs-server/src/request.rs index 16e517e..c62ec9b 100644 --- a/crates/kvs-server/src/request.rs +++ b/crates/kvs-server/src/request.rs @@ -3,33 +3,32 @@ use common::{Message, MessageType, Response}; use prost::Message as ProstMessage; use std::{ io::{Read, Write}, - net::TcpStream, + net::{Shutdown, TcpStream}, }; -use tracing::{error, info, trace}; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace}; pub(crate) fn serve_request(mut stream: TcpStream) -> anyhow::Result<()> { - let mut buffer = vec![0_u8; 1024]; + let mut buffer: Vec = vec![]; let bytes_read = stream.read_to_end(&mut buffer)?; - trace!("Request bytes read {bytes_read}"); - if bytes_read == 0 { - bail!("Client terminated request.. aborting"); - } + trace!("{bytes_read} bytes read : {buffer:?}"); if buffer.iter().all(|x| *x == 0) { bail!("Request is zeroes.. aborting"); } - // stream.shutdown(std::net::Shutdown::Read)?; + stream.shutdown(Shutdown::Read)?; { /* Response */ let response: Vec = handle_request(&buffer)?; drop(buffer); stream.write_all(response.as_slice())?; stream.flush()?; - stream.shutdown(std::net::Shutdown::Write)?; - info!("Request completed 🚀"); + stream.shutdown(Shutdown::Write)?; + trace!("Request completed 🚀"); } Ok(()) } fn handle_request(buffer: &[u8]) -> anyhow::Result> { + trace!("Handling request"); let request: Message = Message::decode(buffer).with_context(|| { error!("🚨 Failed to parse request from client",); "🚨 Server cannot decode request" @@ -45,7 +44,7 @@ fn handle_request(buffer: &[u8]) -> anyhow::Result> { value: "O RM ".to_string(), }, }; - let mut buffer = vec![0_u8; 1024]; + let mut buffer: Vec = vec![]; response .encode(&mut buffer) .context("Server failed to encode response back to client")?; diff --git a/crates/kvs-server/src/server.rs b/crates/kvs-server/src/server.rs index b15b358..66ab776 100644 --- a/crates/kvs-server/src/server.rs +++ b/crates/kvs-server/src/server.rs @@ -23,7 +23,7 @@ fn main() -> anyhow::Result<()> { let server = TcpListener::bind(socket).expect("Failed to bind to socket"); for stream in server.incoming() { let request_id = uuid::Uuid::new_v4(); - let span = tracing::info_span!("Request Processing", %request_id ); + let span = tracing::info_span!("Request Processing", %request_id); let _span_enter = span.enter(); if let Err(err) = serve_request(stream?) { error!(%err)