Skip to content

Commit

Permalink
Finally, get a client-server tcp connection working reliably
Browse files Browse the repository at this point in the history
  • Loading branch information
weezy20 committed Feb 24, 2024
1 parent fe209a9 commit ad9026d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 17 deletions.
12 changes: 7 additions & 5 deletions crates/kvs-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context;
use common::message::Payload;
use common::{Get, Message, Rm, Set};
use common::{Get, Message, Rm, Set, MessageType};
use kvs::cli::{Action, GetCmd, RmCmd, SetCmd};
use kvs::exit_program;
use log::trace;
Expand Down Expand Up @@ -36,11 +36,11 @@ fn main() -> anyhow::Result<()> {
}

fn send(payload: Payload, server: &mut TcpStream) -> anyhow::Result<()> {
let mut message_bytes = vec![0_u8; 1024];
let mut message_bytes : Vec<u8> = vec![];
let r#type = match payload {
Payload::Set { .. } => 0_i32,
Payload::Get { .. } => 1_i32,
Payload::Rm { .. } => 2_i32,
Payload::Set { .. } => MessageType::Set as i32, // 0
Payload::Get { .. } => MessageType::Get as i32, // 1
Payload::Rm { .. } => MessageType::Rm as i32, // 2
};
let message = Message {
r#type,
Expand All @@ -56,6 +56,8 @@ fn send(payload: Payload, server: &mut TcpStream) -> anyhow::Result<()> {
server.flush()?;
log::debug!("Written {} bytes to server stream", message_bytes.len());
log::trace!("Bytes -> {message_bytes:?}");
server.shutdown(std::net::Shutdown::Write)?;
log::trace!("Shut down server stream write");
}
// Clear buffer, await response
message_bytes.clear();
Expand Down
21 changes: 10 additions & 11 deletions crates/kvs-server/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,32 @@ use common::{Message, MessageType, Response};
use prost::Message as ProstMessage;
use std::{
io::{Read, Write},
net::TcpStream,
net::{Shutdown, TcpStream},
};
use tracing::{error, info, trace};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace};

pub(crate) fn serve_request(mut stream: TcpStream) -> anyhow::Result<()> {
let mut buffer = vec![0_u8; 1024];
let mut buffer: Vec<u8> = vec![];
let bytes_read = stream.read_to_end(&mut buffer)?;
trace!("Request bytes read {bytes_read}");
if bytes_read == 0 {
bail!("Client terminated request.. aborting");
}
trace!("{bytes_read} bytes read : {buffer:?}");
if buffer.iter().all(|x| *x == 0) {
bail!("Request is zeroes.. aborting");
}
// stream.shutdown(std::net::Shutdown::Read)?;
stream.shutdown(Shutdown::Read)?;
{
/* Response */
let response: Vec<u8> = handle_request(&buffer)?;
drop(buffer);
stream.write_all(response.as_slice())?;
stream.flush()?;
stream.shutdown(std::net::Shutdown::Write)?;
info!("Request completed 🚀");
stream.shutdown(Shutdown::Write)?;
trace!("Request completed 🚀");
}
Ok(())
}
fn handle_request(buffer: &[u8]) -> anyhow::Result<Vec<u8>> {
trace!("Handling request");
let request: Message = Message::decode(buffer).with_context(|| {
error!("🚨 Failed to parse request from client",);
"🚨 Server cannot decode request"
Expand All @@ -45,7 +44,7 @@ fn handle_request(buffer: &[u8]) -> anyhow::Result<Vec<u8>> {
value: "O RM ".to_string(),
},
};
let mut buffer = vec![0_u8; 1024];
let mut buffer: Vec<u8> = vec![];
response
.encode(&mut buffer)
.context("Server failed to encode response back to client")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/kvs-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn main() -> anyhow::Result<()> {
let server = TcpListener::bind(socket).expect("Failed to bind to socket");
for stream in server.incoming() {
let request_id = uuid::Uuid::new_v4();
let span = tracing::info_span!("Request Processing", %request_id );
let span = tracing::info_span!("Request Processing", %request_id);
let _span_enter = span.enter();
if let Err(err) = serve_request(stream?) {
error!(%err)
Expand Down

0 comments on commit ad9026d

Please sign in to comment.