Skip to content

Commit

Permalink
serde does not use intermediate buffer and API uses Bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
RPallas92 committed Nov 10, 2024
1 parent a8ef904 commit c1ac382
Show file tree
Hide file tree
Showing 14 changed files with 294 additions and 293 deletions.
39 changes: 22 additions & 17 deletions benches/graus_db_single_thread.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bytes::{Bytes, BytesMut};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use graus_db::GrausDb;
use rand::prelude::*;
Expand All @@ -13,8 +14,11 @@ fn set_bench(c: &mut Criterion) {
(GrausDb::open(temp_dir.path()).unwrap(), temp_dir)
},
|(store, _temp_dir)| {
let value = Bytes::from_static(b"value");
for i in 1..(1 << 12) {
store.set(format!("key{}", i), b"value").unwrap();
store
.set(Bytes::from(format!("key{}", i)), value.clone())
.unwrap();
}
},
BatchSize::SmallInput,
Expand All @@ -30,29 +34,27 @@ fn update_if_bench(c: &mut Criterion) {
|| {
let temp_dir = TempDir::new().unwrap();
let store = GrausDb::open(temp_dir.path()).unwrap();
let key = "key1";
let key = Bytes::from_static(b"key1");
let value: u64 = 3500;
store.set(key.to_owned(), &value.to_le_bytes()).unwrap();
store
.set(key.clone(), Bytes::copy_from_slice(&value.to_be_bytes()))
.unwrap();
(store, temp_dir, key)
},
|(store, _temp_dir, key)| {
let update_fn = |value: &mut [u8]| {
let num = u64::from_le_bytes(value.try_into().expect("incorrect length"));
let incremented_num = num - 1;
value.copy_from_slice(&incremented_num.to_le_bytes());
let update_fn = |value: &mut BytesMut| {
let num = u64::from_le_bytes(
value.as_ref()[..8].try_into().expect("incorrect length"),
) - 1;
value.copy_from_slice(&num.to_le_bytes());
};
let predicate = |value: &[u8]| {
let num = u64::from_le_bytes(value.try_into().expect("incorrect length"));
let predicate = |value: &Bytes| {
let num = u64::from_le_bytes(value[..].try_into().expect("incorrect length"));
num > 0
};

for _ in 1..(1 << 12) {
let _ = store.update_if(
key.to_owned(),
update_fn,
Some(key.to_owned()),
Some(predicate),
);
let _ = store.update_if(key.to_owned(), update_fn, Some(&key), Some(predicate));
}
},
BatchSize::SmallInput,
Expand All @@ -67,13 +69,16 @@ fn get_bench(c: &mut Criterion) {
group.bench_with_input(format!("graus_db_get_{}", i), i, |b, i| {
let temp_dir = TempDir::new().unwrap();
let store = GrausDb::open(temp_dir.path()).unwrap();
let value = Bytes::from_static(b"value");
for key_i in 1..(1 << i) {
store.set(format!("key{}", key_i), b"value").unwrap();
store
.set(Bytes::from(format!("key{}", key_i)), value.clone())
.unwrap();
}
let mut rng = SmallRng::from_seed([0; 16]);
b.iter(|| {
store
.get(format!("key{}", rng.gen_range(1, 1 << i)))
.get(&Bytes::from(format!("key{}", rng.gen_range(1, 1 << i))))
.unwrap();
})
});
Expand Down
18 changes: 6 additions & 12 deletions src/db_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,17 @@ use bytes::Bytes;
/// Struct representing a command to the database.
#[derive(Debug, PartialEq)]
pub enum Command {
// TODO Ricardo &str instead?
Set { key: String, value: Bytes },
Remove { key: String },
Set { key: Bytes, value: Bytes },
Remove { key: Bytes },
}

impl Command {
pub fn set<K: AsRef<str>>(key: K, value: Bytes) -> Command {
Command::Set {
key: key.as_ref().to_owned(),
value,
}
pub fn set(key: Bytes, value: Bytes) -> Command {
Command::Set { key, value }
}

pub fn remove<K: AsRef<str>>(key: K) -> Command {
Command::Remove {
key: key.as_ref().to_owned(),
}
pub fn remove(key: Bytes) -> Command {
Command::Remove { key }
}
}
/// Struct representing the position of a command in a given file.
Expand Down
48 changes: 23 additions & 25 deletions src/graus_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::log_storage::log_helpers::{get_log_ids, load_log, log_path, new_log_f
use crate::log_storage::log_reader::LogReader;
use crate::log_storage::log_writer::LogWriter;
use crate::{GrausError, Result};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use crossbeam_skiplist::SkipMap;
use std::cell::RefCell;
use std::fs::{self, File};
Expand All @@ -24,17 +24,19 @@ use std::{collections::HashMap, path::PathBuf};
/// # use graus_db::{GrausDb, Result};
/// # fn try_main() -> Result<()> {
/// use std::env::current_dir;
/// use bytes::Bytes;
///
/// let store = GrausDb::open(current_dir()?)?;
/// store.set("key", b"value")?;
/// let val = store.get("key".to_owned())?;
/// assert_eq!(val, Some("value".into()));
/// store.set(Bytes::from_static(b"key"), Bytes::from_static(b"value"))?;
/// let val = store.get(&Bytes::from_static(b"key"))?;
/// assert_eq!(val, Some(Bytes::from_static(b"value")));
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct GrausDb {
// Index that maps every Key to a position in a log file.
index: Arc<SkipMap<String, CommandPos>>,
index: Arc<SkipMap<Bytes, CommandPos>>,
// Writes new data into the file system logs. Protected by a mutex.
writer: Arc<Mutex<LogWriter>>,
// Reads data from the file system logs.
Expand Down Expand Up @@ -66,7 +68,7 @@ impl GrausDb {
for &log_id in &log_ids {
let log_path = log_path(&path, log_id);
let mut reader = BufReaderWithPos::new(File::open(&log_path)?)?;
uncompacted += load_log(log_id, &mut reader, &*index)?;
uncompacted += load_log(log_id, &mut reader, &index)?;
readers.insert(log_id, reader);
}

Expand Down Expand Up @@ -99,21 +101,17 @@ impl GrausDb {
/// Sets the value of a string key to a string.
///
/// If the key already exists, the previous value will be overwritten.
pub fn set<K: AsRef<str>>(&self, key: K, value: &[u8]) -> Result<()> {
self.writer
.lock()
.unwrap()
.set(key, Bytes::copy_from_slice(value))
pub fn set(&self, key: Bytes, value: Bytes) -> Result<()> {
self.writer.lock().unwrap().set(key, value)
}

/// Gets the string value of a given string key.
///
/// Returns `None` if the given key does not exist.
pub fn get<K: AsRef<str>>(&self, key: K) -> Result<Option<Vec<u8>>> {
// TODO Ricardo return &[u8]
if let Some(cmd_pos) = self.index.get(key.as_ref()) {
pub fn get(&self, key: &Bytes) -> Result<Option<Bytes>> {
if let Some(cmd_pos) = self.index.get(key) {
if let Command::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
Ok(Some(value.to_vec()))
Ok(Some(value))
} else {
Err(GrausError::UnexpectedCommandType)
}
Expand All @@ -125,29 +123,28 @@ impl GrausDb {
/// Removes a given key.
///
/// Returns GrausError::KeyNotFound if the key does not exist.
pub fn remove(&self, key: String) -> Result<()> {
pub fn remove(&self, key: Bytes) -> Result<()> {
self.writer.lock().unwrap().remove(key)
}

/// Updates atomically an existing value.
///
/// If predicate_key and predicate are provided, it won´t update the value if the predicate
/// is not satisfied for predicate_key.
pub fn update_if<K, F, P>(
pub fn update_if<F, P>(
&self,
key: K,
key: Bytes,
update_fn: F,
predicate_key: Option<K>,
predicate_key: Option<&Bytes>,
predicate: Option<P>,
) -> Result<()>
where
K: AsRef<str>,
F: FnOnce(&mut [u8]),
P: FnOnce(&[u8]) -> bool,
F: FnOnce(&mut BytesMut),
P: FnOnce(&Bytes) -> bool,
{
let mut writer = self.writer.lock().unwrap();
let current_value = self.get(&key)?;
let Some(mut current_value) = current_value else {
let Some(current_value) = current_value else {
return Err(GrausError::KeyNotFound);
};

Expand All @@ -161,7 +158,8 @@ impl GrausDb {
}
}

update_fn(&mut current_value);
writer.set(key, Bytes::from(current_value))
let mut current_value_mut = BytesMut::from(current_value);
update_fn(&mut current_value_mut);
writer.set(key, current_value_mut.freeze())
}
}
28 changes: 8 additions & 20 deletions src/io_types.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,13 @@
use crate::Result;
use std::{
fs::{File, Metadata},
io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
};

pub trait MetadataReader {
fn metadata(&self) -> io::Result<Metadata>;
}

impl MetadataReader for File {
fn metadata(&self) -> io::Result<Metadata> {
self.metadata()
}
}
use std::io::{self, BufRead, BufReader, BufWriter, Read, Seek, SeekFrom, Write};

/// A buffered reader that stores the current position
pub struct BufReaderWithPos<R: Read + Seek + MetadataReader> {
pub struct BufReaderWithPos<R: Read + Seek> {
pub pos: u64,
reader: BufReader<R>,
}

impl<R: Read + Seek + MetadataReader> BufReaderWithPos<R> {
impl<R: Read + Seek> BufReaderWithPos<R> {
pub fn new(mut inner: R) -> Result<Self> {
let pos = inner.seek(SeekFrom::Current(0))?;
Ok(BufReaderWithPos {
Expand All @@ -29,20 +16,21 @@ impl<R: Read + Seek + MetadataReader> BufReaderWithPos<R> {
})
}

pub fn get_metadata(&self) -> io::Result<Metadata> {
self.reader.get_ref().metadata()
pub fn is_exhausted(&mut self) -> Result<bool> {
let buf = self.reader.fill_buf()?;
Ok(buf.is_empty())
}
}

impl<R: Read + Seek + MetadataReader> Read for BufReaderWithPos<R> {
impl<R: Read + Seek> Read for BufReaderWithPos<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = self.reader.read(buf)?;
self.pos += len as u64;
Ok(len)
}
}

impl<R: Read + Seek + MetadataReader> Seek for BufReaderWithPos<R> {
impl<R: Read + Seek> Seek for BufReaderWithPos<R> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.pos = self.reader.seek(pos)?;
Ok(self.pos)
Expand Down
Loading

0 comments on commit c1ac382

Please sign in to comment.