Skip to content

Commit

Permalink
incomplete req/res
Browse files Browse the repository at this point in the history
  • Loading branch information
weezy20 committed Feb 24, 2024
1 parent fac74be commit fe209a9
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 47 deletions.
17 changes: 17 additions & 0 deletions crates/common/src/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ syntax = "proto3";

package kvs_message;

// Request message types for different operations
enum MessageType {
SET = 0;
GET = 1;
RM = 2;
}

// Message to set a key-value pair
message Set {
string key = 1;
Expand All @@ -18,6 +25,16 @@ message Rm {
string key = 1;
}

// Message containing data for different operations
message Message {
MessageType type = 1;
oneof payload {
Set set = 2;
Get get = 3;
Rm rm = 4;
}
}

// Response from Server to Client if any
message Response {
string value = 1;
Expand Down
71 changes: 46 additions & 25 deletions crates/kvs-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,71 @@
use anyhow::Context;
use common::{Get, Rm, Set};
use common::message::Payload;
use common::{Get, Message, Rm, Set};
use kvs::cli::{Action, GetCmd, RmCmd, SetCmd};
use kvs::exit_program;
use prost::Message;
use log::trace;
use prost::Message as ProstMessage;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream};

fn main() -> anyhow::Result<()> {
::env_logger::init();
let cli = <Cli as clap::Parser>::parse();
let server = cli.server.parse::<SocketAddr>()?;
let server = cli.addr.parse::<SocketAddr>()?;
let mut server = TcpStream::connect(server)?;
log::info!("connected to server {}", server.peer_addr()?);
match cli.action {
log::info!("🌐 Connected to server [{}]", server.peer_addr()?);
if match cli.action {
Action::Set(SetCmd { key, value }) => {
log::debug!("Requesting -> Set {} = {}", key, value);
send(Set { key, value }, &mut server)?;
log::debug!("✉️ Requesting -> Set {} = {}", key, value);
send(Payload::Set(Set { key, value }), &mut server)
}
Action::Get(GetCmd { key }) => {
log::debug!("Requesting -> Get {}", key);
send(Get { key }, &mut server)?;
log::debug!("✉️ Requesting -> Get {}", key);
send(Payload::Get(Get { key }), &mut server)
}
Action::Remove(RmCmd { key }) => {
log::debug!("Requesting -> Rm {}", key);
send(Rm { key }, &mut server)?;
log::debug!("✉️ Requesting -> Rm {}", key);
send(Payload::Rm(Rm { key }), &mut server)
}
}
.is_err()
{
exit_program(1);
}
exit_program(0);
}

fn send(message: impl prost::Message, server: &mut TcpStream) -> anyhow::Result<()> {
// Send message towards server
fn send(payload: Payload, server: &mut TcpStream) -> anyhow::Result<()> {
let mut message_bytes = vec![0_u8; 1024];
message
.encode(&mut message_bytes)
.context("failed to encode message into bytes")?;
server.write_all(&message_bytes)?;
server.flush()?;
log::debug!("Written {} bytes to server stream", message_bytes.len());
let r#type = match payload {
Payload::Set { .. } => 0_i32,
Payload::Get { .. } => 1_i32,
Payload::Rm { .. } => 2_i32,
};
let message = Message {
r#type,
payload: Some(payload),
};
trace!("Message request -> {:#?}", message);
{
// Send message towards server
message
.encode(&mut message_bytes)
.context("failed to encode message into bytes")?;
server.write_all(&message_bytes)?;
server.flush()?;
log::debug!("Written {} bytes to server stream", message_bytes.len());
log::trace!("Bytes -> {message_bytes:?}");
}
// Clear buffer, await response
message_bytes.clear();
let bytes_read = server.read_to_end(&mut message_bytes)?;
log::debug!("Got {} bytes back ", bytes_read);
let response = common::Response::decode(&message_bytes[0..bytes_read])
.context("failed to decode message response from server")?;
println!("{}", response.value);
{
let bytes_read = server.read_to_end(&mut message_bytes)?;
log::debug!("Got {} bytes back ", bytes_read);
let response = common::Response::decode(&message_bytes[0..bytes_read])
.context("failed to decode message response from server")?;
println!("{}", response.value);
}
Ok(())
}

Expand All @@ -54,5 +75,5 @@ struct Cli {
action: Action,
/// Server location
#[arg(short, long, default_value = "127.0.0.1:4000")]
server: String,
addr: String,
}
64 changes: 45 additions & 19 deletions crates/kvs-server/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,53 @@
use anyhow::{bail, Context};
use common::{Message, MessageType, Response};
use prost::Message as ProstMessage;
use std::{
io::{Read, Write},
net::TcpStream,
};
use tracing::{error, info, trace};

use anyhow::Context;
use common::Response;
use prost::Message;

pub(crate) fn serve_request(mut s: TcpStream) -> anyhow::Result<()> {
pub(crate) fn serve_request(mut stream: TcpStream) -> anyhow::Result<()> {
let mut buffer = vec![0_u8; 1024];
let bytes_read = s.read(&mut buffer)?;
// let _req =
// common::Get::decode(&buffer[..bytes_read]).context("Unimplemented decode. Use only GET")?;

// s.shutdown(std::net::Shutdown::Read)?;
let response = Response {
value: "OOOONF YOU SENT A GET ".to_string(),
};
buffer.clear();
response.encode(&mut buffer)?;
s.write_all(buffer.as_slice())?;
s.flush()?;
s.shutdown(std::net::Shutdown::Write)?;
tracing::info!("served :)");
let bytes_read = stream.read_to_end(&mut buffer)?;
trace!("Request bytes read {bytes_read}");
if bytes_read == 0 {
bail!("Client terminated request.. aborting");
}
if buffer.iter().all(|x| *x == 0) {
bail!("Request is zeroes.. aborting");
}
// stream.shutdown(std::net::Shutdown::Read)?;
{
/* Response */
let response: Vec<u8> = handle_request(&buffer)?;
drop(buffer);
stream.write_all(response.as_slice())?;
stream.flush()?;
stream.shutdown(std::net::Shutdown::Write)?;
info!("Request completed 🚀");
}
Ok(())
}
fn handle_request(buffer: &[u8]) -> anyhow::Result<Vec<u8>> {
let request: Message = Message::decode(buffer).with_context(|| {
error!("🚨 Failed to parse request from client",);
"🚨 Server cannot decode request"
})?;
let response = match MessageType::try_from(request.r#type)? {
MessageType::Set => Response {
value: "OOOONF YOU SENT A SET ".to_string(),
},
MessageType::Get => Response {
value: "GO GET ".to_string(),
},
MessageType::Rm => Response {
value: "O RM ".to_string(),
},
};
let mut buffer = vec![0_u8; 1024];
response
.encode(&mut buffer)
.context("Server failed to encode response back to client")?;
Ok(buffer)
}
6 changes: 4 additions & 2 deletions crates/kvs-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ fn main() -> anyhow::Result<()> {
let server = TcpListener::bind(socket).expect("Failed to bind to socket");
for stream in server.incoming() {
let request_id = uuid::Uuid::new_v4();
let span = tracing::info_span!("Processing", %request_id );
let span = tracing::info_span!("Request Processing", %request_id );
let _span_enter = span.enter();
serve_request(stream?)?;
if let Err(err) = serve_request(stream?) {
error!(%err)
}
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion server.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
export RUST_LOG=debug
export RUST_LOG=trace
cargo build --workspace;

cargo watch --watch crates/kvs-server -x 'r -p kvs-server -- --engine kvs'

0 comments on commit fe209a9

Please sign in to comment.