From a54359b4d512e118ebadc6c37056f0a5bf3346dd Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Thu, 18 Apr 2024 22:35:10 +0100 Subject: [PATCH 1/6] feat: introduce StoreValue --- src/engine.rs | 6 +++--- src/lib.rs | 2 +- src/store.rs | 35 ++++++++++++++++++++++------------- 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index b99bbac..3774883 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,6 +1,6 @@ use std::future::Future; -use crate::{proto::GetResponse, Result}; +use crate::{store::StoreValue, Result}; /// Generic trait implementation for pluggable storage engines outside of the one /// implemented by this crate. @@ -12,7 +12,7 @@ use crate::{proto::GetResponse, Result}; /// /// [`KvStore`]: crate::store::KvStore pub trait KvsEngine: Clone + Send + Sync + 'static { - fn set(&self, key: String, value: String) -> impl Future>; - fn get(&self, key: String) -> impl Future>>; + fn set(&self, key: String, value: StoreValue) -> impl Future>; + fn get(&self, key: String) -> impl Future>>; fn remove(&self, key: String) -> impl Future>; } diff --git a/src/lib.rs b/src/lib.rs index 76edb18..bc7d996 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,7 @@ mod store; pub use engine::KvsEngine; pub use error::Error as KvStoreError; pub use server::StandaloneServer; -pub use store::KvStore; +pub use store::{KvStore, StoreValue}; pub mod action; pub mod client; diff --git a/src/store.rs b/src/store.rs index a351dde..22feec8 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,5 +1,4 @@ use crate::engine::KvsEngine; -use crate::proto::GetResponse; use crate::{KvStoreError, Result}; use crate::{LOG_PREFIX, MAX_LOG_FILE_SIZE}; use dashmap::DashMap; @@ -24,6 +23,22 @@ pub enum Operation { Remove, } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct StoreValue(pub Option>); + + +impl From<&str> for StoreValue { + fn from(value: &str) -> Self { + StoreValue(Some(value.into())) + } +} + +impl From for StoreValue { + fn from(value: String) -> Self { + StoreValue(Some(value.into_bytes())) + } +} + #[derive(Clone, Debug)] pub struct StoreWriter { active_log_file: PathBuf, @@ -78,7 +93,7 @@ struct LogEntry { /// the entry with the most recent timestamp wins. timestamp: i64, key: String, - value: Option, + value: Option, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -104,8 +119,8 @@ struct StoreConfig { impl KvsEngine for KvStore { /// Set the value of a key by inserting the value into the store for the given key. - async fn set(&self, key: String, value: String) -> Result<()> { - debug!(key, value, "Setting key"); + async fn set(&self, key: String, value: StoreValue) -> Result<()> { + debug!(key, "Setting key"); let timestamp = chrono::Utc::now().timestamp(); let entry = LogEntry { timestamp, @@ -151,7 +166,7 @@ impl KvsEngine for KvStore { /// /// The timestamp is typically used with replication, as the value acts as /// a version number and conflict resolution mechanism. - async fn get(&self, key: String) -> Result> { + async fn get(&self, key: String) -> Result> { debug!(key, "Getting key"); match self.keydir.get(&key) { Some(entry) => { @@ -166,13 +181,7 @@ impl KvsEngine for KvStore { entry_file.seek(SeekFrom::Start(entry.offset as u64))?; let log_entry: LogEntry = bincode::deserialize_from(entry_file)?; match log_entry.value { - Some(value) => { - debug!(value, "Value exists"); - Ok(Some(GetResponse { - value: Some(value), - timestamp: log_entry.timestamp, - })) - } + Some(value) => Ok(Some(value)), // This is a tombstone value and equates to a deleted key and // the "Key not found" scenario. None => { @@ -190,7 +199,7 @@ impl KvsEngine for KvStore { debug!(key, "Removing key"); match self.keydir.remove(&key) { Some(_entry) => { - let tombstone = LogEntry { + let tombstone: LogEntry = LogEntry { timestamp: chrono::Utc::now().timestamp(), operation: Operation::Remove, key: key.clone(), From 3057c2a344cb44df7f19a9220bf75481946c93db Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Thu, 18 Apr 2024 22:35:46 +0100 Subject: [PATCH 2/6] refactor: use StoreValue across squirrel --- proto/actions.proto | 5 ++--- src/bin/sqrl-client.rs | 10 ++++++++-- src/client.rs | 6 ++++-- src/replication/server.rs | 26 +++++++++++++------------- src/server.rs | 16 +++++++--------- 5 files changed, 34 insertions(+), 29 deletions(-) diff --git a/proto/actions.proto b/proto/actions.proto index 26a46b8..2fee7d8 100644 --- a/proto/actions.proto +++ b/proto/actions.proto @@ -14,7 +14,7 @@ message Acknowledgement { message SetRequest { string key = 1; - string value = 2; + bytes value = 2; int64 timestamp = 3; } @@ -23,8 +23,7 @@ message GetRequest { } message GetResponse { - optional string value = 1; - int64 timestamp = 2; + optional bytes value = 1; } message RemoveRequest { diff --git a/src/bin/sqrl-client.rs b/src/bin/sqrl-client.rs index 1476a7d..4b978f6 100644 --- a/src/bin/sqrl-client.rs +++ b/src/bin/sqrl-client.rs @@ -2,6 +2,7 @@ use clap::Parser; use sqrl::action::Action; use sqrl::client::{Client, RemoteNodeClient}; +use sqrl::StoreValue; #[derive(Debug, Parser)] #[command(author, version, about, long_about = None)] @@ -20,12 +21,17 @@ async fn main() -> anyhow::Result<()> { match cli.subcmd { Action::Set { key, value } => { - client.set(key, value).await?; + client + .set(key, StoreValue(Some(value.into_bytes()))) + .await?; } Action::Get { key } => { let response = client.get(key).await?; match response { - Some(v) => println!("{}", v.value.unwrap()), + Some(v) => { + let out = String::from_utf8_lossy(v.value()); + println!("{out}"); + } None => println!("Key not found"), } } diff --git a/src/client.rs b/src/client.rs index 8bc0039..a5be70a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,6 +6,7 @@ use crate::proto::action_client::ActionClient; use crate::proto::{Acknowledgement, GetResponse, RemoveRequest}; use crate::proto::{GetRequest, SetRequest}; pub use crate::replication::ReplicatedServer; +use crate::store::StoreValue; use crate::Result; /// A client used for interacting with the [`KvStore`] via gRPC requests. @@ -13,7 +14,7 @@ use crate::Result; pub trait Client { async fn get(&mut self, key: String) -> anyhow::Result>; - async fn set(&mut self, key: String, value: String) -> anyhow::Result; + async fn set(&mut self, key: String, value: StoreValue) -> anyhow::Result; async fn remove(&mut self, key: String) -> anyhow::Result; } @@ -45,7 +46,8 @@ impl RemoteNodeClient { #[tonic::async_trait] impl Client for RemoteNodeClient { - async fn set(&mut self, key: String, value: String) -> anyhow::Result { + async fn set(&mut self, key: String, value: StoreValue) -> anyhow::Result { + let value = value.0.unwrap_or_default(); let req = tonic::Request::new(SetRequest { key, value, diff --git a/src/replication/server.rs b/src/replication/server.rs index 9c584b6..5b33750 100644 --- a/src/replication/server.rs +++ b/src/replication/server.rs @@ -8,6 +8,7 @@ use tracing::{debug, info}; use crate::client::{Client, RemoteNodeClient}; use crate::proto::action_server::{Action, ActionServer}; use crate::proto::{Acknowledgement, GetRequest, GetResponse, RemoveRequest, SetRequest}; +use crate::store::StoreValue; use crate::{KvsEngine, StandaloneServer}; /// Wrapped implementation of a [`StandaloneServer`] with an awareness of multiple @@ -67,12 +68,9 @@ impl Action for ReplicatedServer { let key = req.key.clone(); let response = match self.server.store.get(key).await.unwrap() { Some(r) => r, - None => GetResponse { - value: None, - timestamp: 0, - }, + None => StoreValue(None), }; - Ok(tonic::Response::new(response)) + Ok(tonic::Response::new(GetResponse { value: response.0 })) } async fn set( @@ -83,14 +81,16 @@ impl Action for ReplicatedServer { debug!("Setting value to local store"); self.server .store - .set(req.key.clone(), req.value.clone()) + .set(req.key.clone(), StoreValue(Some(req.value.clone()))) .await .unwrap(); debug!("Replicating to remote replicas"); futures::stream::iter(self.remote_replicas.lock().await.iter_mut()) .for_each(|r| async { - r.set(req.key.clone(), req.value.clone()).await.unwrap(); + r.set(req.key.clone(), StoreValue(Some(req.value.clone()))) + .await + .unwrap(); }) .await; @@ -179,7 +179,7 @@ mod test { thread::sleep(Duration::from_millis(1500)); replicated_client - .set("key1".to_string(), "value1".to_string()) + .set("key1".to_string(), "value1".into()) .await .unwrap(); @@ -192,7 +192,7 @@ mod test { .unwrap() .unwrap() .value, - Some("value1".to_string()), + Some("value1".into()), "No replication for initial value" ); assert_eq!( @@ -206,12 +206,12 @@ mod test { .unwrap() .unwrap() .value, - Some("value1".to_string()), + Some("value1".into()), "No replication for initial value" ); replicated_client - .set("key1".to_string(), "overwritten".to_string()) + .set("key1".to_string(), "overwritten".into()) .await .unwrap(); wait_for_replication(); @@ -222,7 +222,7 @@ mod test { .unwrap() .unwrap() .value, - Some("overwritten".to_string()), + Some("overwritten".into()), "No replication for overwritten value" ); assert_eq!( @@ -232,7 +232,7 @@ mod test { .unwrap() .unwrap() .value, - Some("overwritten".to_string()), + Some("overwritten".into()), "No replication for overwritten value" ); diff --git a/src/server.rs b/src/server.rs index cfadd1f..a733653 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,6 +2,7 @@ use crate::proto::{ action_server::{Action, ActionServer}, Acknowledgement, GetRequest, GetResponse, RemoveRequest, SetRequest, }; +use crate::store::StoreValue; use crate::KvStore; use crate::KvsEngine; use std::{net::SocketAddr, sync::Arc}; @@ -46,14 +47,8 @@ impl Action for StandaloneServer { ) -> tonic::Result, tonic::Status> { let req = req.into_inner(); match self.store.get(req.key).await.unwrap() { - Some(v) => Ok(tonic::Response::new(GetResponse { - value: v.value, - timestamp: v.timestamp, - })), - None => Ok(tonic::Response::new(GetResponse { - value: None, - timestamp: 0, - })), + Some(value) => Ok(tonic::Response::new(GetResponse { value: value.0 })), + None => Ok(tonic::Response::new(GetResponse { value: None })), } } @@ -62,7 +57,10 @@ impl Action for StandaloneServer { req: tonic::Request, ) -> tonic::Result, tonic::Status> { let req = req.into_inner(); - self.store.set(req.key, req.value).await.unwrap(); + self.store + .set(req.key, StoreValue(Some(req.value))) + .await + .unwrap(); Ok(tonic::Response::new(Acknowledgement { success: true })) } From 5593e20c2ae895d2b52cd22d000af2bf01b53b0e Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Thu, 18 Apr 2024 22:36:18 +0100 Subject: [PATCH 3/6] refactor: update tests --- tests/kv_store.rs | 94 +++++++++++++++++++++++------------------------ 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/tests/kv_store.rs b/tests/kv_store.rs index 2f2910d..98c34fa 100644 --- a/tests/kv_store.rs +++ b/tests/kv_store.rs @@ -1,5 +1,5 @@ use rand::Rng; -use sqrl::{KvStore, KvsEngine, Result}; +use sqrl::{KvStore, KvsEngine, Result, StoreValue}; use std::{collections::HashMap, sync::Arc}; use tempfile::TempDir; use tokio::sync::Barrier; @@ -11,28 +11,28 @@ async fn get_stored_value() -> Result<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let store = KvStore::open(temp_dir.path())?; - store.set("key1".to_owned(), "value1".to_owned()).await?; - store.set("key2".to_owned(), "value2".to_owned()).await?; + store.set("key1".into(), "value1".into()).await?; + store.set("key2".into(), "value2".into()).await?; assert_eq!( - store.get("key1".to_owned()).await?.unwrap().value, - Some("value1".to_owned()) + store.get("key1".into()).await?.unwrap().0, + Some("value1".into()) ); assert_eq!( - store.get("key2".to_owned()).await?.unwrap().value, - Some("value2".to_owned()) + store.get("key2".into()).await?.unwrap().0, + Some("value2".into()) ); // Open from disk again and check persistent data drop(store); let store = KvStore::open(temp_dir.path())?; assert_eq!( - store.get("key1".to_owned()).await?.unwrap().value, - Some("value1".to_owned()) + store.get("key1".into()).await?.unwrap().0, + Some("value1".into()) ); assert_eq!( - store.get("key2".to_owned()).await?.unwrap().value, - Some("value2".to_owned()) + store.get("key2".into()).await?.unwrap().0, + Some("value2".into()) ); Ok(()) @@ -43,28 +43,28 @@ async fn overwrite_value() -> Result<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let store = KvStore::open(temp_dir.path())?; - store.set("key1".to_owned(), "value1".to_owned()).await?; + store.set("key1".into(), "value1".into()).await?; assert_eq!( - store.get("key1".to_owned()).await?.unwrap().value, - Some("value1".to_owned()) + store.get("key1".into()).await?.unwrap().0, + Some("value1".into()) ); - store.set("key1".to_owned(), "value2".to_owned()).await?; + store.set("key1".into(), "value2".into()).await?; assert_eq!( - store.get("key1".to_owned()).await?.unwrap().value, - Some("value2".to_owned()) + store.get("key1".into()).await?.unwrap().0, + Some("value2".into()) ); // Open from disk again and check persistent data drop(store); let store = KvStore::open(temp_dir.path())?; assert_eq!( - store.get("key1".to_owned()).await?.unwrap().value, - Some("value2".to_owned()) + store.get("key1".into()).await?.unwrap().0, + Some("value2".into()) ); - store.set("key1".to_owned(), "value3".to_owned()).await?; + store.set("key1".into(), "value3".into()).await?; assert_eq!( - store.get("key1".to_owned()).await?.unwrap().value, - Some("value3".to_owned()) + store.get("key1".into()).await?.unwrap().0, + Some("value3".into()) ); Ok(()) @@ -76,13 +76,13 @@ async fn get_non_existent_value() -> Result<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let store = KvStore::open(temp_dir.path())?; - store.set("key1".to_owned(), "value1".to_owned()).await?; - assert_eq!(store.get("key2".to_owned()).await?, None); + store.set("key1".into(), "value1".into()).await?; + assert_eq!(store.get("key2".into()).await?, None); // Open from disk again and check persistent data drop(store); let store = KvStore::open(temp_dir.path())?; - assert_eq!(store.get("key2".to_owned()).await?, None); + assert_eq!(store.get("key2".into()).await?, None); Ok(()) } @@ -91,7 +91,7 @@ async fn get_non_existent_value() -> Result<()> { async fn remove_non_existent_key() -> Result<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let store = KvStore::open(temp_dir.path())?; - assert!(store.remove("key1".to_owned()).await.is_err()); + assert!(store.remove("key1".into()).await.is_err()); Ok(()) } @@ -99,9 +99,9 @@ async fn remove_non_existent_key() -> Result<()> { async fn remove_key() -> Result<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let store = KvStore::open(temp_dir.path())?; - store.set("key1".to_owned(), "value1".to_owned()).await?; - assert!(store.remove("key1".to_owned()).await.is_ok()); - assert_eq!(store.get("key1".to_owned()).await?, None); + store.set("key1".into(), "value1".into()).await?; + assert!(store.remove("key1".into()).await.is_ok()); + assert_eq!(store.get("key1".into()).await?, None); Ok(()) } @@ -128,7 +128,7 @@ async fn compaction() -> Result<()> { for key_id in 0..1000 { let key = format!("key{}", key_id); let value = format!("{}", iter); - store.set(key, value).await?; + store.set(key, value.into()).await?; } let new_size = dir_size(); @@ -144,8 +144,8 @@ async fn compaction() -> Result<()> { for key_id in 0..1000 { let key = format!("key{}", key_id); assert_eq!( - store.get(key).await?.unwrap().value, - Some(format!("{}", iter)) + store.get(key).await?.unwrap().0, + Some(format!("{}", iter).into()) ); } return Ok(()); @@ -161,7 +161,7 @@ async fn randomised_retrieval() -> Result<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let store = KvStore::open(temp_dir.path())?; - let mut value_tracker = HashMap::new(); + let mut value_tracker: HashMap = HashMap::new(); let mut rng = rand::thread_rng(); for i in 0..1000 { let key = format!("key{}", i); @@ -170,13 +170,13 @@ async fn randomised_retrieval() -> Result<()> { // Always set some random keys on every iteration for _ in 0..100 { store - .set(key.clone(), format!("value{}", rng.gen::())) + .set(key.clone(), format!("value{}", rng.gen::()).into()) .await?; } if rng.gen::() % 2 == 0 { - store.set(key.clone(), value.clone()).await?; - value_tracker.insert(key.clone(), value.clone()); + store.set(key.clone(), value.clone().into()).await?; + value_tracker.insert(key.clone(), value.into()); } else { match store.remove(key.clone()).await { Ok(_) => { @@ -192,7 +192,7 @@ async fn randomised_retrieval() -> Result<()> { let store = KvStore::open(temp_dir.path())?; for (k, v) in value_tracker { - assert_eq!(store.get(k).await?.unwrap().value, Some(v)); + assert_eq!(store.get(k).await?.unwrap(), v); } Ok(()) @@ -208,7 +208,7 @@ async fn concurrent_set() -> Result<()> { let barrier = barrier.clone(); tokio::spawn(async move { store - .set(format!("key{}", i), format!("value{}", i)) + .set(format!("key{}", i), format!("value{}", i).into()) .await .unwrap(); barrier.wait().await; @@ -218,8 +218,8 @@ async fn concurrent_set() -> Result<()> { for i in 0..1000 { assert_eq!( - store.get(format!("key{}", i)).await.unwrap().unwrap().value, - Some(format!("value{}", i)) + store.get(format!("key{}", i)).await.unwrap().unwrap().0, + Some(format!("value{}", i).into()) ); } @@ -228,8 +228,8 @@ async fn concurrent_set() -> Result<()> { let store = KvStore::open(temp_dir.path())?; for i in 0..1000 { assert_eq!( - store.get(format!("key{}", i)).await.unwrap().unwrap().value, - Some(format!("value{}", i)) + store.get(format!("key{}", i)).await.unwrap().unwrap().0, + Some(format!("value{}", i).into()) ); } @@ -242,7 +242,7 @@ async fn concurrent_get() -> Result<()> { let store = KvStore::open(temp_dir.path())?; for i in 0..100 { store - .set(format!("key{}", i), format!("value{}", i)) + .set(format!("key{}", i), format!("value{}", i).into()) .await .unwrap(); } @@ -259,8 +259,8 @@ async fn concurrent_get() -> Result<()> { .await .unwrap() .unwrap() - .value, - Some(format!("value{}", key_id)) + .0, + Some(format!("value{}", key_id).into()) ); } }); @@ -285,8 +285,8 @@ async fn concurrent_get() -> Result<()> { .await .unwrap() .unwrap() - .value, - Some(format!("value{}", key_id)) + .0, + Some(format!("value{}", key_id).into()) ); } }); From 6aec007f15ee26aa79f3a4e7c5e9d85231cd85da Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Thu, 18 Apr 2024 22:42:18 +0100 Subject: [PATCH 4/6] fmt --- src/store.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/store.rs b/src/store.rs index 22feec8..0c58765 100644 --- a/src/store.rs +++ b/src/store.rs @@ -26,7 +26,6 @@ pub enum Operation { #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] pub struct StoreValue(pub Option>); - impl From<&str> for StoreValue { fn from(value: &str) -> Self { StoreValue(Some(value.into())) From f593c69790acb4ecd92f7832d8d6a6524c7be506 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Fri, 19 Apr 2024 17:20:59 +0100 Subject: [PATCH 5/6] s/StoreValue/Value --- benches/sqrl_bench.rs | 2 +- src/bin/sqrl-client.rs | 6 ++---- src/client.rs | 6 +++--- src/engine.rs | 6 +++--- src/lib.rs | 2 +- src/replication/server.rs | 8 ++++---- src/server.rs | 4 ++-- src/store.rs | 16 ++++++++-------- tests/kv_store.rs | 4 ++-- 9 files changed, 26 insertions(+), 28 deletions(-) diff --git a/benches/sqrl_bench.rs b/benches/sqrl_bench.rs index 6fff251..38959ac 100644 --- a/benches/sqrl_bench.rs +++ b/benches/sqrl_bench.rs @@ -9,7 +9,7 @@ fn write_direct(c: &mut criterion::Criterion) { c.bench_with_input(BenchmarkId::new("write", "store"), &store, |b, s| { b.to_async(&rt).iter(|| async { - s.set("key".to_string(), "value".to_string()).await.unwrap(); + s.set("key".to_string(), "value".into()).await.unwrap(); }) }); } diff --git a/src/bin/sqrl-client.rs b/src/bin/sqrl-client.rs index 4b978f6..9bdc5cd 100644 --- a/src/bin/sqrl-client.rs +++ b/src/bin/sqrl-client.rs @@ -2,7 +2,7 @@ use clap::Parser; use sqrl::action::Action; use sqrl::client::{Client, RemoteNodeClient}; -use sqrl::StoreValue; +use sqrl::Value; #[derive(Debug, Parser)] #[command(author, version, about, long_about = None)] @@ -21,9 +21,7 @@ async fn main() -> anyhow::Result<()> { match cli.subcmd { Action::Set { key, value } => { - client - .set(key, StoreValue(Some(value.into_bytes()))) - .await?; + client.set(key, Value(Some(value.into_bytes()))).await?; } Action::Get { key } => { let response = client.get(key).await?; diff --git a/src/client.rs b/src/client.rs index a5be70a..346e73c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,7 +6,7 @@ use crate::proto::action_client::ActionClient; use crate::proto::{Acknowledgement, GetResponse, RemoveRequest}; use crate::proto::{GetRequest, SetRequest}; pub use crate::replication::ReplicatedServer; -use crate::store::StoreValue; +use crate::store::Value; use crate::Result; /// A client used for interacting with the [`KvStore`] via gRPC requests. @@ -14,7 +14,7 @@ use crate::Result; pub trait Client { async fn get(&mut self, key: String) -> anyhow::Result>; - async fn set(&mut self, key: String, value: StoreValue) -> anyhow::Result; + async fn set(&mut self, key: String, value: Value) -> anyhow::Result; async fn remove(&mut self, key: String) -> anyhow::Result; } @@ -46,7 +46,7 @@ impl RemoteNodeClient { #[tonic::async_trait] impl Client for RemoteNodeClient { - async fn set(&mut self, key: String, value: StoreValue) -> anyhow::Result { + async fn set(&mut self, key: String, value: Value) -> anyhow::Result { let value = value.0.unwrap_or_default(); let req = tonic::Request::new(SetRequest { key, diff --git a/src/engine.rs b/src/engine.rs index 3774883..89b1e64 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,6 +1,6 @@ use std::future::Future; -use crate::{store::StoreValue, Result}; +use crate::{store::Value, Result}; /// Generic trait implementation for pluggable storage engines outside of the one /// implemented by this crate. @@ -12,7 +12,7 @@ use crate::{store::StoreValue, Result}; /// /// [`KvStore`]: crate::store::KvStore pub trait KvsEngine: Clone + Send + Sync + 'static { - fn set(&self, key: String, value: StoreValue) -> impl Future>; - fn get(&self, key: String) -> impl Future>>; + fn set(&self, key: String, value: Value) -> impl Future>; + fn get(&self, key: String) -> impl Future>>; fn remove(&self, key: String) -> impl Future>; } diff --git a/src/lib.rs b/src/lib.rs index bc7d996..f17b61e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,7 @@ mod store; pub use engine::KvsEngine; pub use error::Error as KvStoreError; pub use server::StandaloneServer; -pub use store::{KvStore, StoreValue}; +pub use store::{KvStore, Value}; pub mod action; pub mod client; diff --git a/src/replication/server.rs b/src/replication/server.rs index 5b33750..03a3840 100644 --- a/src/replication/server.rs +++ b/src/replication/server.rs @@ -8,7 +8,7 @@ use tracing::{debug, info}; use crate::client::{Client, RemoteNodeClient}; use crate::proto::action_server::{Action, ActionServer}; use crate::proto::{Acknowledgement, GetRequest, GetResponse, RemoveRequest, SetRequest}; -use crate::store::StoreValue; +use crate::store::Value; use crate::{KvsEngine, StandaloneServer}; /// Wrapped implementation of a [`StandaloneServer`] with an awareness of multiple @@ -68,7 +68,7 @@ impl Action for ReplicatedServer { let key = req.key.clone(); let response = match self.server.store.get(key).await.unwrap() { Some(r) => r, - None => StoreValue(None), + None => Value(None), }; Ok(tonic::Response::new(GetResponse { value: response.0 })) } @@ -81,14 +81,14 @@ impl Action for ReplicatedServer { debug!("Setting value to local store"); self.server .store - .set(req.key.clone(), StoreValue(Some(req.value.clone()))) + .set(req.key.clone(), Value(Some(req.value.clone()))) .await .unwrap(); debug!("Replicating to remote replicas"); futures::stream::iter(self.remote_replicas.lock().await.iter_mut()) .for_each(|r| async { - r.set(req.key.clone(), StoreValue(Some(req.value.clone()))) + r.set(req.key.clone(), Value(Some(req.value.clone()))) .await .unwrap(); }) diff --git a/src/server.rs b/src/server.rs index a733653..7c5062f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,7 +2,7 @@ use crate::proto::{ action_server::{Action, ActionServer}, Acknowledgement, GetRequest, GetResponse, RemoveRequest, SetRequest, }; -use crate::store::StoreValue; +use crate::store::Value; use crate::KvStore; use crate::KvsEngine; use std::{net::SocketAddr, sync::Arc}; @@ -58,7 +58,7 @@ impl Action for StandaloneServer { ) -> tonic::Result, tonic::Status> { let req = req.into_inner(); self.store - .set(req.key, StoreValue(Some(req.value))) + .set(req.key, Value(Some(req.value))) .await .unwrap(); Ok(tonic::Response::new(Acknowledgement { success: true })) diff --git a/src/store.rs b/src/store.rs index 0c58765..56da230 100644 --- a/src/store.rs +++ b/src/store.rs @@ -24,17 +24,17 @@ pub enum Operation { } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] -pub struct StoreValue(pub Option>); +pub struct Value(pub Option>); -impl From<&str> for StoreValue { +impl From<&str> for Value { fn from(value: &str) -> Self { - StoreValue(Some(value.into())) + Value(Some(value.into())) } } -impl From for StoreValue { +impl From for Value { fn from(value: String) -> Self { - StoreValue(Some(value.into_bytes())) + Value(Some(value.into_bytes())) } } @@ -92,7 +92,7 @@ struct LogEntry { /// the entry with the most recent timestamp wins. timestamp: i64, key: String, - value: Option, + value: Option, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -118,7 +118,7 @@ struct StoreConfig { impl KvsEngine for KvStore { /// Set the value of a key by inserting the value into the store for the given key. - async fn set(&self, key: String, value: StoreValue) -> Result<()> { + async fn set(&self, key: String, value: Value) -> Result<()> { debug!(key, "Setting key"); let timestamp = chrono::Utc::now().timestamp(); let entry = LogEntry { @@ -165,7 +165,7 @@ impl KvsEngine for KvStore { /// /// The timestamp is typically used with replication, as the value acts as /// a version number and conflict resolution mechanism. - async fn get(&self, key: String) -> Result> { + async fn get(&self, key: String) -> Result> { debug!(key, "Getting key"); match self.keydir.get(&key) { Some(entry) => { diff --git a/tests/kv_store.rs b/tests/kv_store.rs index 98c34fa..1a0f08b 100644 --- a/tests/kv_store.rs +++ b/tests/kv_store.rs @@ -1,5 +1,5 @@ use rand::Rng; -use sqrl::{KvStore, KvsEngine, Result, StoreValue}; +use sqrl::{KvStore, KvsEngine, Result, Value}; use std::{collections::HashMap, sync::Arc}; use tempfile::TempDir; use tokio::sync::Barrier; @@ -161,7 +161,7 @@ async fn randomised_retrieval() -> Result<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let store = KvStore::open(temp_dir.path())?; - let mut value_tracker: HashMap = HashMap::new(); + let mut value_tracker: HashMap = HashMap::new(); let mut rng = rand::thread_rng(); for i in 0..1000 { let key = format!("key{}", i); From b6c414ebc8d2a2bd3a41fc0df2f9a5443fffebe4 Mon Sep 17 00:00:00 2001 From: Jack Dockerty Date: Fri, 19 Apr 2024 17:27:38 +0100 Subject: [PATCH 6/6] docstring for Value --- src/store.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/store.rs b/src/store.rs index 56da230..dcb42c5 100644 --- a/src/store.rs +++ b/src/store.rs @@ -23,6 +23,12 @@ pub enum Operation { Remove, } +/// Value for the store, associated to a key. +/// +/// This is a simple wrapper around a [`Vec`] which represent any value that +/// can be (de)serialised into a [`LogEntry`]. +/// +/// TODO: Can this be better represented as a generic, rather than a "new type"? #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] pub struct Value(pub Option>);