Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: background compaction #23

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/bin/sqrl-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,16 @@ async fn main() -> anyhow::Result<()> {
if clients.len() > 3 {
warn!("Replicating to many followers can greatly impact write performance");
}
ReplicatedServer::new(clients.into(), app.log_file, app.addr)?
ReplicatedServer::new(clients.into(), app.log_file, app.addr)
.await?
.run()
.await
}
replication::Mode::Follower => {
StandaloneServer::new(app.log_file, app.addr)
.await?
.run()
.await
}
replication::Mode::Follower => StandaloneServer::new(app.log_file, app.addr)?.run().await,
}
}
4 changes: 2 additions & 2 deletions src/replication/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct ReplicatedServer {
}

impl ReplicatedServer {
pub fn new<P>(
pub async fn new<P>(
clients: Mutex<Vec<RemoteNodeClient>>,
path: P,
addr: SocketAddr,
Expand All @@ -37,7 +37,7 @@ impl ReplicatedServer {
{
Ok(Self {
addr,
server: Arc::new(StandaloneServer::new(path, addr)?),
server: Arc::new(StandaloneServer::new(path, addr).await?),
remote_replicas: Arc::new(clients),
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ pub struct StandaloneServer {
}

impl StandaloneServer {
pub fn new<P>(path: P, addr: SocketAddr) -> anyhow::Result<Self>
pub async fn new<P>(path: P, addr: SocketAddr) -> anyhow::Result<Self>
where
P: Into<std::path::PathBuf>,
{
let store = Arc::new(KvStore::open(path)?);
let store = Arc::new(KvStore::open(path).await?);
Ok(Self { store, addr })
}

Expand Down
88 changes: 58 additions & 30 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::usize;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::level_filters::LevelFilter;
use tracing::{self, debug, info, warn};
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
Expand All @@ -28,15 +31,15 @@ pub enum Operation {
pub struct StoreWriter {
active_log_file: PathBuf,
active_log_handle: Option<Arc<RwLock<File>>>,
position: Arc<AtomicUsize>,
position: usize,
}

impl Default for StoreWriter {
fn default() -> Self {
StoreWriter {
active_log_file: PathBuf::default(),
active_log_handle: None,
position: Arc::new(AtomicUsize::new(0)),
position: *Arc::new(0),
}
}
}
Expand All @@ -62,6 +65,12 @@ pub struct KvStore {
/// The maximum size of a log file in bytes.
max_log_file_size: u64,

/// Channel on which compaction notifications are sent.
///
/// This allows the [`KvStore`] to signal to the background task which receives
/// compaction jobs to begin its work.
compaction_tx: Sender<()>,

_tracing: Option<Arc<tracing::subscriber::DefaultGuard>>,
}

Expand Down Expand Up @@ -96,10 +105,11 @@ struct KeydirEntry {
timestamp: i64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug)]
struct StoreConfig {
log_location: PathBuf,
max_log_file_size: u64,
compaction_tx: Sender<()>,
}

impl KvsEngine for KvStore {
Expand Down Expand Up @@ -141,7 +151,8 @@ impl KvsEngine for KvStore {
active_file = ?self.writer.read().unwrap().active_log_file.display(),
"Compaction required"
);
self.compact()?;
self.compaction_tx.send(()).await.unwrap();
self.set_active_log_handle()?;
}
Ok(())
}
Expand Down Expand Up @@ -197,12 +208,7 @@ impl KvsEngine for KvStore {
value: None,
};

let pos = self
.writer
.read()
.unwrap()
.position
.load(std::sync::atomic::Ordering::SeqCst);
let pos = self.writer.read().unwrap().position;
self.append_to_log(&tombstone)?;
debug!(
position = pos,
Expand All @@ -211,13 +217,14 @@ impl KvsEngine for KvStore {
);

if pos as u64 > self.max_log_file_size {
self.compact()?;
debug!(
current_size = pos,
max_log_file_size = self.max_log_file_size,
active_file = ?self.writer.read().unwrap().active_log_file.display(),
"Compaction required"
);
self.compaction_tx.send(()).await.unwrap();
self.set_active_log_handle()?;
}
Ok(())
}
Expand All @@ -230,28 +237,31 @@ impl KvStore {
/// Create a new KvStore.
///
/// The store is created in memory and is not persisted to disk.
fn new(config: StoreConfig) -> KvStore {
fn new(config: &StoreConfig) -> KvStore {
KvStore {
writer: Arc::new(RwLock::new(StoreWriter::default())),
log_location: config.log_location,
log_location: config.log_location.clone(),
keydir: Arc::new(DashMap::new()),
max_log_file_size: config.max_log_file_size,
compaction_tx: config.compaction_tx.clone(),
_tracing: None,
}
}

/// Open a KvStore at the given path.
pub fn open<P>(path: P) -> Result<KvStore>
pub async fn open<P>(path: P) -> Result<KvStore>
where
P: Into<PathBuf>,
{
let path = path.into();
let (tx, mut rx) = tokio::sync::mpsc::channel(5);
let store_config = StoreConfig {
log_location: path.clone(),
max_log_file_size: MAX_LOG_FILE_SIZE.with(|f| *f),
compaction_tx: tx,
};

let mut store = KvStore::new(store_config.clone());
let mut store = KvStore::new(&store_config);
let log_level = std::env::var("KVS_LOG").unwrap_or("info".to_string());
store.setup_logging(log_level)?;
info!("Initialising store");
Expand All @@ -260,6 +270,24 @@ impl KvStore {

debug!("Creating initial log file");
store.set_active_log_handle()?;
tokio::spawn(async move {
let interval_ms: u64 = std::env::var("KVS_COMPACTION_INTERVAL_MS")
.unwrap_or_else(|_| "200".to_owned())
.parse()
.expect("Unable to parse default compaction interval");
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
info!(interval_ms, "Background compaction polling");
loop {
interval.tick().await;
match rx.try_recv() {
Ok(_trigger) => {
info!("Compaction triggered");
}
Err(TryRecvError::Empty) => debug!("No compaction required"),
Err(TryRecvError::Disconnected) => panic!("Compaction channel unavailable"),
}
}
});
Ok(store)
}

Expand Down Expand Up @@ -330,21 +358,17 @@ impl KvStore {
fn append_to_log(&self, entry: &LogEntry) -> Result<usize> {
let data = bincode::serialize(&entry)?;

let write_lock = self.writer.write().unwrap();
let pos = write_lock
.position
.load(std::sync::atomic::Ordering::SeqCst);
let mut write_lock = self.writer.write().unwrap();
let pos = write_lock.position;
write_lock.position += data.len();
let mut file_lock = write_lock
.active_log_handle
.as_ref()
.ok_or(KvStoreError::NoActiveLogFile)?
.write()
.unwrap();
file_lock.write_at(&data, pos as u64)?;
file_lock.write_at(&data, write_lock.position as u64)?;
file_lock.flush()?;
write_lock
.position
.fetch_add(data.len(), std::sync::atomic::Ordering::SeqCst);
// Returning the offset of the entry in the log file after it has been written.
// This means that the next entry is written after this one.
Ok(pos)
Expand Down Expand Up @@ -438,7 +462,6 @@ impl KvStore {
});
}

self.set_active_log_handle()?;
for file in active_files.as_ref() {
if *file.value() == 0 {
debug!(f = ?file.key(), "Removing file which has no entries");
Expand All @@ -459,9 +482,7 @@ impl KvStore {
.open(&next_log_file_name)?;
debug!(active_file = next_log_file_name, "Created new log file");
writer.active_log_file = PathBuf::from(next_log_file_name.clone());
writer
.position
.store(0, std::sync::atomic::Ordering::SeqCst);
writer.position = 0;
writer.active_log_handle = Some(Arc::new(RwLock::new(log_file)));
Ok(())
}
Expand All @@ -486,10 +507,17 @@ impl KvStore {

pub(crate) fn setup_logging(&mut self, level: String) -> anyhow::Result<()> {
let level = LevelFilter::from_str(&level)?;
let layer = tracing_subscriber::fmt::layer().with_writer(std::io::stderr);
let layer = tracing_subscriber::fmt::layer()
.compact()
.with_writer(std::io::stderr)
.with_thread_ids(true)
.with_line_number(true)
.with_file(true)
.with_target(false);
let subscriber = tracing_subscriber::registry().with(level).with(layer);
let tracing_guard = tracing::subscriber::set_default(subscriber);
self._tracing = Some(Arc::new(tracing_guard));
tracing::subscriber::set_global_default(subscriber)?;
//let tracing_guard = tracing::subscriber::set_default(subscriber);
//self._tracing = Some(Arc::new(tracing_guard));
Ok(())
}
}
Expand Down
Loading