Skip to content

Commit

Permalink
wip : integrate server with kvs/sled
Browse files Browse the repository at this point in the history
  • Loading branch information
weezy20 committed Feb 24, 2024
1 parent 1883660 commit f8670c6
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 17 deletions.
3 changes: 2 additions & 1 deletion crates/common/src/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ message Message {

// Response from Server to Client if any
message Response {
string value = 1;
bool success = 1;
string value = 2;
}
2 changes: 1 addition & 1 deletion crates/kvs-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ fn send(payload: Payload, server: &mut TcpStream) -> anyhow::Result<()> {
log::debug!("Written {} bytes to server stream", message_bytes.len());
log::trace!("Bytes -> {message_bytes:?}");
server.shutdown(std::net::Shutdown::Write)?;
log::trace!("Shut down server stream write");
}
// Clear buffer, await response
message_bytes.clear();
{
// We depend on the server to shutdown the stream after it's finished sending a response
let bytes_read = server.read_to_end(&mut message_bytes)?;
log::debug!("Got {} bytes back ", bytes_read);
let response = common::Response::decode(&message_bytes[0..bytes_read])
Expand Down
17 changes: 12 additions & 5 deletions crates/kvs-server/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use anyhow::{bail, Context};
use common::{Message, MessageType, Response};
use kvs::KvsEngine;
use crate::Backend;
use prost::Message as ProstMessage;
use std::{
io::{Read, Write},
Expand All @@ -8,7 +10,7 @@ use std::{
#[allow(unused_imports)]
use tracing::{debug, error, info, trace};

pub(crate) fn serve_request(mut stream: TcpStream) -> anyhow::Result<()> {
pub(crate) fn serve_request(backend: &mut Backend, mut stream: TcpStream) -> anyhow::Result<()> {
let mut buffer: Vec<u8> = vec![];
let bytes_read = stream.read_to_end(&mut buffer)?;
trace!("{bytes_read} bytes read : {buffer:?}");
Expand All @@ -18,7 +20,7 @@ pub(crate) fn serve_request(mut stream: TcpStream) -> anyhow::Result<()> {
stream.shutdown(Shutdown::Read)?;
{
/* Response */
let response: Vec<u8> = handle_request(&buffer)?;
let response: Vec<u8> = handle_request(backend , &buffer)?;
drop(buffer);
stream.write_all(response.as_slice())?;
stream.flush()?;
Expand All @@ -27,20 +29,25 @@ pub(crate) fn serve_request(mut stream: TcpStream) -> anyhow::Result<()> {
}
Ok(())
}
fn handle_request(buffer: &[u8]) -> anyhow::Result<Vec<u8>> {
fn handle_request(backend: &mut Backend, buffer: &[u8]) -> anyhow::Result<Vec<u8>> {
trace!("Handling request");
let request: Message = Message::decode(buffer).with_context(|| {
error!("🚨 Failed to parse request from client",);
"🚨 Server cannot decode request"
})?;
let response = match MessageType::try_from(request.r#type)? {
MessageType::Set => Response {
MessageType::Set => {
backend.set(&request.payload.unwrap().key, &request.payload.unwrap().value)?;
Response {
success: true,
value: "OOOONF YOU SENT A SET ".to_string(),
},
}},
MessageType::Get => Response {
success: true,
value: "GO GET ".to_string(),
},
MessageType::Rm => Response {
success: true,
value: "O RM ".to_string(),
},
};
Expand Down
46 changes: 36 additions & 10 deletions crates/kvs-server/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use kvs::exit_program;
use kvs::{exit_program, SledKvsEngine, KvStore};
use request::serve_request;
use std::net::{SocketAddr, TcpListener};
use std::env;
use tracing::{error, info};

mod request;
Expand All @@ -9,28 +10,35 @@ fn main() -> anyhow::Result<()> {
::env_logger::init();
let KvsServer { socket, engine } = <KvsServer as clap::Parser>::parse();
let socket: SocketAddr = socket.parse().expect("Failed to parse socket address");
let engine: Backend = match engine.expect("clap default used").as_str() {
"kvs" => Backend::Kvs,
"sled" => Backend::Sled,
let engine_str = engine.expect("clap default used");
let mut backend: Backend = match engine_str.as_str() {
"kvs" => {
Backend::Kvs(KvStore::open(env::current_dir()?)?)
}
"sled" => {
Backend::Sled(SledKvsEngine::open(env::current_dir()?)?)
},
_ => {
error!("Unsupported Engine");
exit_program(2);
}
};
info!("Starting KVS server version {}", env!("CARGO_PKG_VERSION"));
info!("Server configuration - IP:PORT: {socket}, Storage Engine: {engine}");
info!("Server configuration - IP:PORT: {socket}, Storage Engine: {}", engine_str);

let server = TcpListener::bind(socket).expect("Failed to bind to socket");
for stream in server.incoming() {
let request_id = uuid::Uuid::new_v4();
let span = tracing::info_span!("Request Processing", %request_id);
let _span_enter = span.enter();
if let Err(err) = serve_request(stream?) {
if let Err(err) = serve_request(&mut backend, stream?) {
error!(%err)
}
}
Ok(())
}


#[derive(clap::Parser)]
struct KvsServer {
#[arg(long = "addr", short = 'a', default_value = "127.0.0.1:4000")]
Expand All @@ -41,15 +49,33 @@ struct KvsServer {
engine: Option<String>,
}
enum Backend {
Kvs,
Sled,
Kvs(KvStore),
Sled(SledKvsEngine),
}

impl std::fmt::Display for Backend {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Backend::Kvs => write!(f, "Kvs"),
Backend::Sled => write!(f, "Sled"),
Backend::Kvs(_) => write!(f, "Kvs"),
Backend::Sled(_) => write!(f, "Sled"),
}
}
}

impl std::ops::Deref for Backend {
type Target = dyn kvs::KvsEngine;
fn deref(&self) -> &Self::Target {
match self {
Backend::Kvs(kvs) => kvs,
Backend::Sled(sled) => sled,
}
}
}
impl std::ops::DerefMut for Backend {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
Backend::Kvs(kvs) => kvs,
Backend::Sled(sled) => sled,
}
}
}

0 comments on commit f8670c6

Please sign in to comment.