Skip to content

Commit

Permalink
refactor: make NeptuneLevelDb api take &mut self.
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-da committed Jan 22, 2024
1 parent 4912ccd commit 150dd84
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ opt-level = 0

[patch.crates-io]
# needed for now.
twenty-first = { git = "https://github.com/Neptune-Crypto/twenty-first.git", rev = "e21a06c3ab9048fc8b6d80c616c0ae2c7b272298" }
twenty-first = { git = "https://github.com/Neptune-Crypto/twenty-first.git", rev = "fac4ead7d0ad0f816064af1948d46689b2430bbf" }
35 changes: 24 additions & 11 deletions src/database/neptune_leveldb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use tokio::task;
use twenty_first::leveldb::{
batch::WriteBatch,
Expand All @@ -23,6 +22,20 @@ where
_value: PhantomData<Value>,
}

impl<Key, Value> Clone for NeptuneLevelDbInternal<Key, Value>
where
Key: Serialize + DeserializeOwned,
Value: Serialize + DeserializeOwned,
{
fn clone(&self) -> Self {
Self {
database: self.database.clone(),
_key: Default::default(),
_value: Default::default(),
}
}
}

// We have to implement `Debug` for `NeptuneLevelDbInternal` as the `State` struct
// contains a database object, and `State` is used as input argument
// to multiple functions where logging is enabled with the `instrument`
Expand Down Expand Up @@ -74,13 +87,13 @@ where
value_bytes.map(|bytes| bincode::deserialize(&bytes).unwrap())
}

fn put(&self, key: Key, value: Value) {
fn put(&mut self, key: Key, value: Value) {
let key_bytes: Vec<u8> = bincode::serialize(&key).unwrap();
let value_bytes: Vec<u8> = bincode::serialize(&value).unwrap();
self.database.put(&key_bytes, &value_bytes).unwrap();
}

fn batch_write(&self, entries: impl IntoIterator<Item = (Key, Value)>) {
fn batch_write(&mut self, entries: impl IntoIterator<Item = (Key, Value)>) {
let batch = WriteBatch::new();
for (key, value) in entries.into_iter() {
let key_bytes: Vec<u8> = bincode::serialize(&key).unwrap();
Expand All @@ -91,7 +104,7 @@ where
self.database.write(&batch, true).unwrap();
}

fn delete(&self, key: Key) -> Option<Value> {
fn delete(&mut self, key: Key) -> Option<Value> {
let key_bytes: Vec<u8> = bincode::serialize(&key).unwrap(); // add safety
let value_bytes: Option<Vec<u8>> = self.database.get(&key_bytes).unwrap();
let value_object = value_bytes.map(|bytes| bincode::deserialize(&bytes).unwrap());
Expand All @@ -103,7 +116,7 @@ where
}
}

fn flush(&self) {
fn flush(&mut self) {
self.database
.write(&WriteBatch::new(), true)
.expect("Database flushing to disk must succeed");
Expand Down Expand Up @@ -132,7 +145,7 @@ where
/// so that the tokio runtime can run the blocking IO on a thread where blocking
/// is acceptable
#[derive(Clone)]
pub struct NeptuneLevelDb<Key, Value>(Arc<NeptuneLevelDbInternal<Key, Value>>)
pub struct NeptuneLevelDb<Key, Value>(NeptuneLevelDbInternal<Key, Value>)
where
Key: Serialize + DeserializeOwned,
Value: Serialize + DeserializeOwned;
Expand Down Expand Up @@ -185,7 +198,7 @@ where
task::spawn_blocking(move || NeptuneLevelDbInternal::new(&path, &options_async.into()))
.await??;

Ok(Self(Arc::new(db)))
Ok(Self(db))
}

/// Get database value asynchronously
Expand All @@ -196,7 +209,7 @@ where

/// Set database value asynchronously
pub async fn put(&mut self, key: Key, value: Value) {
let inner = self.0.clone();
let mut inner = self.0.clone();
task::spawn_blocking(move || inner.put(key, value))
.await
.unwrap()
Expand All @@ -207,23 +220,23 @@ where
&mut self,
entries: impl IntoIterator<Item = (Key, Value)> + Send + Sync + 'static,
) {
let inner = self.0.clone();
let mut inner = self.0.clone();
task::spawn_blocking(move || inner.batch_write(entries))
.await
.unwrap()
}

/// Delete database value asynchronously
pub async fn delete(&mut self, key: Key) -> Option<Value> {
let inner = self.0.clone();
let mut inner = self.0.clone();
task::spawn_blocking(move || inner.delete(key))
.await
.unwrap()
}

/// Delete database value asynchronously
pub async fn flush(&mut self) {
let inner = self.0.clone();
let mut inner = self.0.clone();
task::spawn_blocking(move || inner.flush()).await.unwrap()
}
}
Expand Down

0 comments on commit 150dd84

Please sign in to comment.