diff --git a/src/bin/sqrl-server.rs b/src/bin/sqrl-server.rs
index bea37ea..c270cd8 100644
--- a/src/bin/sqrl-server.rs
+++ b/src/bin/sqrl-server.rs
@@ -84,10 +84,16 @@ async fn main() -> anyhow::Result<()> {
if clients.len() > 3 {
warn!("Replicating to many followers can greatly impact write performance");
}
- ReplicatedServer::new(clients.into(), app.log_file, app.addr)?
+ ReplicatedServer::new(clients.into(), app.log_file, app.addr)
+ .await?
+ .run()
+ .await
+ }
+ replication::Mode::Follower => {
+ StandaloneServer::new(app.log_file, app.addr)
+ .await?
.run()
.await
}
- replication::Mode::Follower => StandaloneServer::new(app.log_file, app.addr)?.run().await,
}
}
diff --git a/src/replication/server.rs b/src/replication/server.rs
index 9c584b6..60d2bc9 100644
--- a/src/replication/server.rs
+++ b/src/replication/server.rs
@@ -27,7 +27,7 @@ pub struct ReplicatedServer {
}
impl ReplicatedServer {
- pub fn new
(
+ pub async fn new
(
clients: Mutex>,
path: P,
addr: SocketAddr,
@@ -37,7 +37,7 @@ impl ReplicatedServer {
{
Ok(Self {
addr,
- server: Arc::new(StandaloneServer::new(path, addr)?),
+ server: Arc::new(StandaloneServer::new(path, addr).await?),
remote_replicas: Arc::new(clients),
})
}
diff --git a/src/server.rs b/src/server.rs
index cfadd1f..9839c19 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -16,11 +16,11 @@ pub struct StandaloneServer {
}
impl StandaloneServer {
- pub fn new(path: P, addr: SocketAddr) -> anyhow::Result
+ pub async fn new(path: P, addr: SocketAddr) -> anyhow::Result
where
P: Into,
{
- let store = Arc::new(KvStore::open(path)?);
+ let store = Arc::new(KvStore::open(path).await?);
Ok(Self { store, addr })
}
diff --git a/src/store.rs b/src/store.rs
index a351dde..01df9f6 100644
--- a/src/store.rs
+++ b/src/store.rs
@@ -12,7 +12,10 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
+use std::time::Duration;
use std::usize;
+use tokio::sync::mpsc::error::TryRecvError;
+use tokio::sync::mpsc::{Receiver, Sender};
use tracing::level_filters::LevelFilter;
use tracing::{self, debug, info, warn};
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
@@ -28,7 +31,7 @@ pub enum Operation {
pub struct StoreWriter {
active_log_file: PathBuf,
active_log_handle: Option>>,
- position: Arc,
+ position: usize,
}
impl Default for StoreWriter {
@@ -36,7 +39,7 @@ impl Default for StoreWriter {
StoreWriter {
active_log_file: PathBuf::default(),
active_log_handle: None,
- position: Arc::new(AtomicUsize::new(0)),
+ position: *Arc::new(0),
}
}
}
@@ -62,6 +65,12 @@ pub struct KvStore {
/// The maximum size of a log file in bytes.
max_log_file_size: u64,
+ /// Channel on which compaction notifications are sent.
+ ///
+ /// This allows the [`KvStore`] to signal to the background task which receives
+ /// compaction jobs to begin its work.
+ compaction_tx: Sender<()>,
+
_tracing: Option>,
}
@@ -96,10 +105,11 @@ struct KeydirEntry {
timestamp: i64,
}
-#[derive(Debug, Clone, Serialize, Deserialize)]
+#[derive(Debug)]
struct StoreConfig {
log_location: PathBuf,
max_log_file_size: u64,
+ compaction_tx: Sender<()>,
}
impl KvsEngine for KvStore {
@@ -141,7 +151,8 @@ impl KvsEngine for KvStore {
active_file = ?self.writer.read().unwrap().active_log_file.display(),
"Compaction required"
);
- self.compact()?;
+ self.compaction_tx.send(()).await.unwrap();
+ self.set_active_log_handle()?;
}
Ok(())
}
@@ -197,12 +208,7 @@ impl KvsEngine for KvStore {
value: None,
};
- let pos = self
- .writer
- .read()
- .unwrap()
- .position
- .load(std::sync::atomic::Ordering::SeqCst);
+ let pos = self.writer.read().unwrap().position;
self.append_to_log(&tombstone)?;
debug!(
position = pos,
@@ -211,13 +217,14 @@ impl KvsEngine for KvStore {
);
if pos as u64 > self.max_log_file_size {
- self.compact()?;
debug!(
current_size = pos,
max_log_file_size = self.max_log_file_size,
active_file = ?self.writer.read().unwrap().active_log_file.display(),
"Compaction required"
);
+ self.compaction_tx.send(()).await.unwrap();
+ self.set_active_log_handle()?;
}
Ok(())
}
@@ -230,28 +237,31 @@ impl KvStore {
/// Create a new KvStore.
///
/// The store is created in memory and is not persisted to disk.
- fn new(config: StoreConfig) -> KvStore {
+ fn new(config: &StoreConfig) -> KvStore {
KvStore {
writer: Arc::new(RwLock::new(StoreWriter::default())),
- log_location: config.log_location,
+ log_location: config.log_location.clone(),
keydir: Arc::new(DashMap::new()),
max_log_file_size: config.max_log_file_size,
+ compaction_tx: config.compaction_tx.clone(),
_tracing: None,
}
}
/// Open a KvStore at the given path.
- pub fn open(path: P) -> Result
+ pub async fn open(path: P) -> Result
where
P: Into,
{
let path = path.into();
+ let (tx, mut rx) = tokio::sync::mpsc::channel(5);
let store_config = StoreConfig {
log_location: path.clone(),
max_log_file_size: MAX_LOG_FILE_SIZE.with(|f| *f),
+ compaction_tx: tx,
};
- let mut store = KvStore::new(store_config.clone());
+ let mut store = KvStore::new(&store_config);
let log_level = std::env::var("KVS_LOG").unwrap_or("info".to_string());
store.setup_logging(log_level)?;
info!("Initialising store");
@@ -260,6 +270,24 @@ impl KvStore {
debug!("Creating initial log file");
store.set_active_log_handle()?;
+ tokio::spawn(async move {
+ let interval_ms: u64 = std::env::var("KVS_COMPACTION_INTERVAL_MS")
+ .unwrap_or_else(|_| "200".to_owned())
+ .parse()
+ .expect("Unable to parse default compaction interval");
+ let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
+ info!(interval_ms, "Background compaction polling");
+ loop {
+ interval.tick().await;
+ match rx.try_recv() {
+ Ok(_trigger) => {
+ info!("Compaction triggered");
+ }
+ Err(TryRecvError::Empty) => debug!("No compaction required"),
+ Err(TryRecvError::Disconnected) => panic!("Compaction channel unavailable"),
+ }
+ }
+ });
Ok(store)
}
@@ -330,21 +358,17 @@ impl KvStore {
fn append_to_log(&self, entry: &LogEntry) -> Result {
let data = bincode::serialize(&entry)?;
- let write_lock = self.writer.write().unwrap();
- let pos = write_lock
- .position
- .load(std::sync::atomic::Ordering::SeqCst);
+ let mut write_lock = self.writer.write().unwrap();
+ let pos = write_lock.position;
+ write_lock.position += data.len();
let mut file_lock = write_lock
.active_log_handle
.as_ref()
.ok_or(KvStoreError::NoActiveLogFile)?
.write()
.unwrap();
- file_lock.write_at(&data, pos as u64)?;
+ file_lock.write_at(&data, write_lock.position as u64)?;
file_lock.flush()?;
- write_lock
- .position
- .fetch_add(data.len(), std::sync::atomic::Ordering::SeqCst);
// Returning the offset of the entry in the log file after it has been written.
// This means that the next entry is written after this one.
Ok(pos)
@@ -438,7 +462,6 @@ impl KvStore {
});
}
- self.set_active_log_handle()?;
for file in active_files.as_ref() {
if *file.value() == 0 {
debug!(f = ?file.key(), "Removing file which has no entries");
@@ -459,9 +482,7 @@ impl KvStore {
.open(&next_log_file_name)?;
debug!(active_file = next_log_file_name, "Created new log file");
writer.active_log_file = PathBuf::from(next_log_file_name.clone());
- writer
- .position
- .store(0, std::sync::atomic::Ordering::SeqCst);
+ writer.position = 0;
writer.active_log_handle = Some(Arc::new(RwLock::new(log_file)));
Ok(())
}
@@ -486,10 +507,17 @@ impl KvStore {
pub(crate) fn setup_logging(&mut self, level: String) -> anyhow::Result<()> {
let level = LevelFilter::from_str(&level)?;
- let layer = tracing_subscriber::fmt::layer().with_writer(std::io::stderr);
+ let layer = tracing_subscriber::fmt::layer()
+ .compact()
+ .with_writer(std::io::stderr)
+ .with_thread_ids(true)
+ .with_line_number(true)
+ .with_file(true)
+ .with_target(false);
let subscriber = tracing_subscriber::registry().with(level).with(layer);
- let tracing_guard = tracing::subscriber::set_default(subscriber);
- self._tracing = Some(Arc::new(tracing_guard));
+ tracing::subscriber::set_global_default(subscriber)?;
+ //let tracing_guard = tracing::subscriber::set_default(subscriber);
+ //self._tracing = Some(Arc::new(tracing_guard));
Ok(())
}
}