Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce Value #22

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/sqrl_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn write_direct(c: &mut criterion::Criterion) {

c.bench_with_input(BenchmarkId::new("write", "store"), &store, |b, s| {
b.to_async(&rt).iter(|| async {
s.set("key".to_string(), "value".to_string()).await.unwrap();
s.set("key".to_string(), "value".into()).await.unwrap();
})
});
}
Expand Down
5 changes: 2 additions & 3 deletions proto/actions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ message Acknowledgement {

message SetRequest {
string key = 1;
string value = 2;
bytes value = 2;
int64 timestamp = 3;
}

Expand All @@ -23,8 +23,7 @@ message GetRequest {
}

message GetResponse {
optional string value = 1;
int64 timestamp = 2;
optional bytes value = 1;
}

message RemoveRequest {
Expand Down
8 changes: 6 additions & 2 deletions src/bin/sqrl-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Parser;

use sqrl::action::Action;
use sqrl::client::{Client, RemoteNodeClient};
use sqrl::Value;

#[derive(Debug, Parser)]
#[command(author, version, about, long_about = None)]
Expand All @@ -20,12 +21,15 @@ async fn main() -> anyhow::Result<()> {

match cli.subcmd {
Action::Set { key, value } => {
client.set(key, value).await?;
client.set(key, Value(Some(value.into_bytes()))).await?;
}
Action::Get { key } => {
let response = client.get(key).await?;
match response {
Some(v) => println!("{}", v.value.unwrap()),
Some(v) => {
let out = String::from_utf8_lossy(v.value());
println!("{out}");
}
None => println!("Key not found"),
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use crate::proto::action_client::ActionClient;
use crate::proto::{Acknowledgement, GetResponse, RemoveRequest};
use crate::proto::{GetRequest, SetRequest};
pub use crate::replication::ReplicatedServer;
use crate::store::Value;
use crate::Result;

/// A client used for interacting with the [`KvStore`] via gRPC requests.
#[tonic::async_trait]
pub trait Client {
async fn get(&mut self, key: String) -> anyhow::Result<Option<GetResponse>>;

async fn set(&mut self, key: String, value: String) -> anyhow::Result<Acknowledgement>;
async fn set(&mut self, key: String, value: Value) -> anyhow::Result<Acknowledgement>;

async fn remove(&mut self, key: String) -> anyhow::Result<Acknowledgement>;
}
Expand Down Expand Up @@ -45,7 +46,8 @@ impl RemoteNodeClient {

#[tonic::async_trait]
impl Client for RemoteNodeClient {
async fn set(&mut self, key: String, value: String) -> anyhow::Result<Acknowledgement> {
async fn set(&mut self, key: String, value: Value) -> anyhow::Result<Acknowledgement> {
let value = value.0.unwrap_or_default();
let req = tonic::Request::new(SetRequest {
key,
value,
Expand Down
6 changes: 3 additions & 3 deletions src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::future::Future;

use crate::{proto::GetResponse, Result};
use crate::{store::Value, Result};

/// Generic trait implementation for pluggable storage engines outside of the one
/// implemented by this crate.
Expand All @@ -12,7 +12,7 @@ use crate::{proto::GetResponse, Result};
///
/// [`KvStore`]: crate::store::KvStore
pub trait KvsEngine: Clone + Send + Sync + 'static {
fn set(&self, key: String, value: String) -> impl Future<Output = Result<()>>;
fn get(&self, key: String) -> impl Future<Output = Result<Option<GetResponse>>>;
fn set(&self, key: String, value: Value) -> impl Future<Output = Result<()>>;
fn get(&self, key: String) -> impl Future<Output = Result<Option<Value>>>;
fn remove(&self, key: String) -> impl Future<Output = Result<()>>;
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod store;
pub use engine::KvsEngine;
pub use error::Error as KvStoreError;
pub use server::StandaloneServer;
pub use store::KvStore;
pub use store::{KvStore, Value};
pub mod action;
pub mod client;

Expand Down
26 changes: 13 additions & 13 deletions src/replication/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::{debug, info};
use crate::client::{Client, RemoteNodeClient};
use crate::proto::action_server::{Action, ActionServer};
use crate::proto::{Acknowledgement, GetRequest, GetResponse, RemoveRequest, SetRequest};
use crate::store::Value;
use crate::{KvsEngine, StandaloneServer};

/// Wrapped implementation of a [`StandaloneServer`] with an awareness of multiple
Expand Down Expand Up @@ -67,12 +68,9 @@ impl Action for ReplicatedServer {
let key = req.key.clone();
let response = match self.server.store.get(key).await.unwrap() {
Some(r) => r,
None => GetResponse {
value: None,
timestamp: 0,
},
None => Value(None),
};
Ok(tonic::Response::new(response))
Ok(tonic::Response::new(GetResponse { value: response.0 }))
}

async fn set(
Expand All @@ -83,14 +81,16 @@ impl Action for ReplicatedServer {
debug!("Setting value to local store");
self.server
.store
.set(req.key.clone(), req.value.clone())
.set(req.key.clone(), Value(Some(req.value.clone())))
.await
.unwrap();

debug!("Replicating to remote replicas");
futures::stream::iter(self.remote_replicas.lock().await.iter_mut())
.for_each(|r| async {
r.set(req.key.clone(), req.value.clone()).await.unwrap();
r.set(req.key.clone(), Value(Some(req.value.clone())))
.await
.unwrap();
})
.await;

Expand Down Expand Up @@ -179,7 +179,7 @@ mod test {
thread::sleep(Duration::from_millis(1500));

replicated_client
.set("key1".to_string(), "value1".to_string())
.set("key1".to_string(), "value1".into())
.await
.unwrap();

Expand All @@ -192,7 +192,7 @@ mod test {
.unwrap()
.unwrap()
.value,
Some("value1".to_string()),
Some("value1".into()),
"No replication for initial value"
);
assert_eq!(
Expand All @@ -206,12 +206,12 @@ mod test {
.unwrap()
.unwrap()
.value,
Some("value1".to_string()),
Some("value1".into()),
"No replication for initial value"
);

replicated_client
.set("key1".to_string(), "overwritten".to_string())
.set("key1".to_string(), "overwritten".into())
.await
.unwrap();
wait_for_replication();
Expand All @@ -222,7 +222,7 @@ mod test {
.unwrap()
.unwrap()
.value,
Some("overwritten".to_string()),
Some("overwritten".into()),
"No replication for overwritten value"
);
assert_eq!(
Expand All @@ -232,7 +232,7 @@ mod test {
.unwrap()
.unwrap()
.value,
Some("overwritten".to_string()),
Some("overwritten".into()),
"No replication for overwritten value"
);

Expand Down
16 changes: 7 additions & 9 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::proto::{
action_server::{Action, ActionServer},
Acknowledgement, GetRequest, GetResponse, RemoveRequest, SetRequest,
};
use crate::store::Value;
use crate::KvStore;
use crate::KvsEngine;
use std::{net::SocketAddr, sync::Arc};
Expand Down Expand Up @@ -46,14 +47,8 @@ impl Action for StandaloneServer {
) -> tonic::Result<tonic::Response<GetResponse>, tonic::Status> {
let req = req.into_inner();
match self.store.get(req.key).await.unwrap() {
Some(v) => Ok(tonic::Response::new(GetResponse {
value: v.value,
timestamp: v.timestamp,
})),
None => Ok(tonic::Response::new(GetResponse {
value: None,
timestamp: 0,
})),
Some(value) => Ok(tonic::Response::new(GetResponse { value: value.0 })),
None => Ok(tonic::Response::new(GetResponse { value: None })),
}
}

Expand All @@ -62,7 +57,10 @@ impl Action for StandaloneServer {
req: tonic::Request<SetRequest>,
) -> tonic::Result<tonic::Response<Acknowledgement>, tonic::Status> {
let req = req.into_inner();
self.store.set(req.key, req.value).await.unwrap();
self.store
.set(req.key, Value(Some(req.value)))
.await
.unwrap();
Ok(tonic::Response::new(Acknowledgement { success: true }))
}

Expand Down
40 changes: 27 additions & 13 deletions src/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::engine::KvsEngine;
use crate::proto::GetResponse;
use crate::{KvStoreError, Result};
use crate::{LOG_PREFIX, MAX_LOG_FILE_SIZE};
use dashmap::DashMap;
Expand All @@ -24,6 +23,27 @@ pub enum Operation {
Remove,
}

/// Value for the store, associated to a key.
///
/// This is a simple wrapper around a [`Vec<u8>`] which represent any value that
/// can be (de)serialised into a [`LogEntry`].
///
/// TODO: Can this be better represented as a generic, rather than a "new type"?
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct Value(pub Option<Vec<u8>>);

impl From<&str> for Value {
fn from(value: &str) -> Self {
Value(Some(value.into()))
}
}

impl From<String> for Value {
fn from(value: String) -> Self {
Value(Some(value.into_bytes()))
}
}

#[derive(Clone, Debug)]
pub struct StoreWriter {
active_log_file: PathBuf,
Expand Down Expand Up @@ -78,7 +98,7 @@ struct LogEntry {
/// the entry with the most recent timestamp wins.
timestamp: i64,
key: String,
value: Option<String>,
value: Option<Value>,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand All @@ -104,8 +124,8 @@ struct StoreConfig {

impl KvsEngine for KvStore {
/// Set the value of a key by inserting the value into the store for the given key.
async fn set(&self, key: String, value: String) -> Result<()> {
debug!(key, value, "Setting key");
async fn set(&self, key: String, value: Value) -> Result<()> {
debug!(key, "Setting key");
let timestamp = chrono::Utc::now().timestamp();
let entry = LogEntry {
timestamp,
Expand Down Expand Up @@ -151,7 +171,7 @@ impl KvsEngine for KvStore {
///
/// The timestamp is typically used with replication, as the value acts as
/// a version number and conflict resolution mechanism.
async fn get(&self, key: String) -> Result<Option<GetResponse>> {
async fn get(&self, key: String) -> Result<Option<Value>> {
debug!(key, "Getting key");
match self.keydir.get(&key) {
Some(entry) => {
Expand All @@ -166,13 +186,7 @@ impl KvsEngine for KvStore {
entry_file.seek(SeekFrom::Start(entry.offset as u64))?;
let log_entry: LogEntry = bincode::deserialize_from(entry_file)?;
match log_entry.value {
Some(value) => {
debug!(value, "Value exists");
Ok(Some(GetResponse {
value: Some(value),
timestamp: log_entry.timestamp,
}))
}
Some(value) => Ok(Some(value)),
// This is a tombstone value and equates to a deleted key and
// the "Key not found" scenario.
None => {
Expand All @@ -190,7 +204,7 @@ impl KvsEngine for KvStore {
debug!(key, "Removing key");
match self.keydir.remove(&key) {
Some(_entry) => {
let tombstone = LogEntry {
let tombstone: LogEntry = LogEntry {
timestamp: chrono::Utc::now().timestamp(),
operation: Operation::Remove,
key: key.clone(),
Expand Down
Loading
Loading