Skip to content

Commit

Permalink
custom serde
Browse files Browse the repository at this point in the history
  • Loading branch information
RPallas92 committed Nov 10, 2024
1 parent d99bbf2 commit 96eae17
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 52 deletions.
25 changes: 2 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@ documentation = "https://github.com/rpallas92/GrausDB"
readme = "README.md"

[dependencies]
bincode = "1.3.3"
bytes = "1.8.0"
crossbeam-skiplist = "0.1"
log = "0.4.6"
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11.15"
thiserror = "1.0"

[dev-dependencies]
Expand Down
20 changes: 8 additions & 12 deletions src/db_command.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
use serde::{Deserialize, Serialize};
use std::ops::Range;

use bytes::Bytes;

/// Struct representing a command to the database.
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, PartialEq)]
pub enum Command {
Set {
key: String,
#[serde(with = "serde_bytes")]
value: Vec<u8>,
},
Remove {
key: String,
},
// TODO Ricardo &str instead?
Set { key: String, value: Bytes },
Remove { key: String },
}

impl Command {
pub fn set<K: AsRef<str>>(key: K, value: &[u8]) -> Command {
pub fn set<K: AsRef<str>>(key: K, value: Bytes) -> Command {
Command::Set {
key: key.as_ref().to_owned(),
value: value.to_vec(),
value,
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ pub enum GrausError {
/// Removing non-existent key error.
#[error("Key not found")]
KeyNotFound,
#[error("Bincode error: {0}")]
/// Serialization or deserialization bincode error.
Bincode(#[from] Box<bincode::ErrorKind>),
/// Serialization or deserialization error.
#[error("{0}")]
SerializationError(String),
/// Unexpected command type error.
/// It indicated a corrupted log or a program bug.
#[error("Unexpected command type")]
Expand Down
15 changes: 12 additions & 3 deletions src/graus_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::log_storage::log_helpers::{get_log_ids, load_log, log_path, new_log_f
use crate::log_storage::log_reader::LogReader;
use crate::log_storage::log_writer::LogWriter;
use crate::{GrausError, Result};
use bytes::Bytes;
use crossbeam_skiplist::SkipMap;
use std::cell::RefCell;
use std::fs::{self, File};
Expand Down Expand Up @@ -40,6 +41,10 @@ pub struct GrausDb {
reader: LogReader,
}

// TODO Ricardo update DOCS as now we don't use Strings

// TODO Ricardo add clippy as linter

impl GrausDb {
/// Opens a `GrausDb` with the given path.
///
Expand Down Expand Up @@ -95,16 +100,20 @@ impl GrausDb {
///
/// If the key already exists, the previous value will be overwritten.
pub fn set<K: AsRef<str>>(&self, key: K, value: &[u8]) -> Result<()> {
self.writer.lock().unwrap().set(key, value)
self.writer
.lock()
.unwrap()
.set(key, Bytes::copy_from_slice(value))
}

/// Gets the string value of a given string key.
///
/// Returns `None` if the given key does not exist.
pub fn get<K: AsRef<str>>(&self, key: K) -> Result<Option<Vec<u8>>> {
// TODO Ricardo return &[u8]
if let Some(cmd_pos) = self.index.get(key.as_ref()) {
if let Command::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
Ok(Some(value))
Ok(Some(value.to_vec()))
} else {
Err(GrausError::UnexpectedCommandType)
}
Expand Down Expand Up @@ -153,6 +162,6 @@ impl GrausDb {
}

update_fn(&mut current_value);
writer.set(key, &current_value)
writer.set(key, Bytes::from(current_value))
}
}
160 changes: 160 additions & 0 deletions src/log_storage/db_command_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use bytes::{BufMut, Bytes, BytesMut};
use std::convert::TryInto;

use crate::db_command::Command;
use crate::{GrausError, Result};

const SET_COMMAND_KEY: u8 = 0;
const REMOVE_COMMAND_KEY: u8 = 1;

pub(crate) fn serialize_command(command: &Command) -> Bytes {
match command {
Command::Set { key, value } => {
let serialized_key = key.as_bytes();
let key_size = serialized_key.len();
let value_size = value.len();

let command_size = 1 // Command type
+ 4 // Key size (u32)
+ key_size // Serialized key bytes
+ 4 // Value size (u32)
+ value_size; // Serialized value bytes

let mut buf = BytesMut::with_capacity(command_size);
buf.put_u8(SET_COMMAND_KEY);
buf.put_u32(key_size as u32);
buf.put_slice(serialized_key);
buf.put_u32(value_size as u32);
buf.put_slice(value);

buf.freeze()
}
Command::Remove { key } => {
let serialized_key = key.as_bytes();
let key_size = serialized_key.len();

let command_size = 1 // Command type
+ 4 // Key size (u32)
+ key_size; // Serialized key bytes

let mut buf = BytesMut::with_capacity(command_size);
buf.put_u8(REMOVE_COMMAND_KEY);
buf.put_u32(key_size as u32);
buf.put_slice(serialized_key);

buf.freeze()
}
}
}

pub(crate) fn deserialize_command(buf: Bytes) -> Result<(usize, Command)> {
let pos = 0;
match buf[pos] {
SET_COMMAND_KEY => {
let (key_bytes_read, key) = read_word(&buf, pos + 1)?;
let (value_bytes_read, value) = read_word(&buf, pos + 1 + key_bytes_read)?;
let key = unsafe { std::str::from_utf8_unchecked(&key).to_string() };
let total_bytes_read = 1 + key_bytes_read + value_bytes_read;
Ok((total_bytes_read, Command::set(key, value)))
}
REMOVE_COMMAND_KEY => {
let (key_bytes_read, key) = read_word(&buf, pos + 1)?;
let key = unsafe { std::str::from_utf8_unchecked(&key).to_string() };
let total_bytes_read = 1 + key_bytes_read;
Ok((total_bytes_read, Command::remove(key)))
}
_ => Err(GrausError::SerializationError(String::from(
"Invalid command found",
))),
}
}

pub struct CommandDeserializer {
buf: Bytes,
pub pos: usize,
}

impl<'a> CommandDeserializer {
pub fn new(buf: Bytes) -> Self {
Self { buf, pos: 0 }
}
}

impl Iterator for CommandDeserializer {
type Item = Result<Command>;

fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.buf.len() {
return None;
}

match deserialize_command(self.buf.slice(self.pos..)) {
Ok((bytes_read, command)) => {
self.pos += bytes_read;
Some(Ok(command))
}
Err(e) => Some(Err(e)),
}
}
}

// A word is composed of {word_size}{word} where:
// - word_size length is 4 bytes
// - and word length is word_size
fn read_word(buf: &Bytes, pos: usize) -> Result<(usize, Bytes)> {
if pos >= buf.len() {
return Err(GrausError::SerializationError(String::from(
"Trying to read bytes outside the buffer len",
)));
}

let word_len = u32::from_be_bytes(
(&buf.slice(pos..pos + 4)[..])
.try_into()
.expect("Failed to convert slice to array"),
);

let word_start_pos = pos + 4;

if word_start_pos + word_len as usize > buf.len() {
return Err(GrausError::SerializationError(String::from(
"Insufficient bytes to read word content",
)));
}

let word_bytes = buf.slice(word_start_pos..word_start_pos + word_len as usize);
let total_bytes_read = 4 + word_bytes.len();

Ok((total_bytes_read, word_bytes))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::db_command::Command;

#[test]
fn test_serde_set_command() {
let command = Command::Set {
key: "test_key".to_string(),
value: Bytes::from_static(b"test value"),
};

let serialized = serialize_command(&command);
let (_, deserialized) = deserialize_command(serialized).unwrap();

assert_eq!(command, deserialized);
}

#[test]
fn test_serde_remove_command() {
let command = Command::Remove {
key: "test_key".to_string(),
};

let serialized = serialize_command(&command);
let (_, deserialized) = deserialize_command(serialized).unwrap();

assert_eq!(command, deserialized);
}
}
18 changes: 15 additions & 3 deletions src/log_storage/log_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ use crate::{
db_command::{Command, CommandPos},
io_types::{BufReaderWithPos, BufWriterWithPos},
};
use bytes::Bytes;
use crossbeam_skiplist::SkipMap;
use std::io::Seek;
use std::io::{Read, Seek};
use std::{
ffi::OsStr,
fs::{self, File, OpenOptions},
io::SeekFrom,
path::{Path, PathBuf},
};

use super::db_command_serde::CommandDeserializer;

// Returns sorted existing log ids in the given directory (path).
pub fn get_log_ids(path: &Path) -> Result<Vec<u64>> {
let mut log_ids: Vec<u64> = fs::read_dir(&path)?
Expand Down Expand Up @@ -53,8 +56,17 @@ pub fn load_log(
let mut pos = reader.seek(SeekFrom::Start(0))?;
let mut uncompacted = 0; // number of bytes that can be saved after a compaction.

while let Ok(command) = bincode::deserialize_from::<_, Command>(&mut *reader) {
let new_pos = reader.stream_position()? as u64;
// Read the entire content into a buffer.
let mut buf = Vec::new();
reader.read_to_end(&mut buf)?;

// Create an iterator for deserializing commands.
let mut deserializer = CommandDeserializer::new(Bytes::from(buf));

// Iterate over the deserialized commands.
while let Some(command) = deserializer.next() {
let new_pos = deserializer.pos as u64;
let command = command?;
match command {
Command::Set { key, .. } => {
let old_cmd = index.insert(
Expand Down
10 changes: 8 additions & 2 deletions src/log_storage/log_reader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use bytes::Bytes;

use super::db_command_serde::deserialize_command;
use super::log_helpers::log_path;
use crate::db_command::Command;
use crate::Result;
Expand Down Expand Up @@ -67,8 +70,11 @@ impl LogReader {
}

pub fn read_command(&self, cmd_pos: CommandPos) -> Result<Command> {
self.read_and(cmd_pos, |cmd_reader| {
Ok(bincode::deserialize_from(cmd_reader)?)
self.read_and(cmd_pos, |mut cmd_reader| {
let mut buf = Vec::with_capacity(cmd_pos.len as usize);
cmd_reader.read_to_end(&mut buf)?;
let (_, command) = deserialize_command(Bytes::from(buf))?;
Ok(command)
})
}
}
Expand Down
Loading

0 comments on commit 96eae17

Please sign in to comment.