Skip to content

Commit

Permalink
Changes go through an in-memory transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
acerone85 committed Oct 1, 2024
1 parent 50afc1a commit 4c42133
Showing 1 changed file with 54 additions and 8 deletions.
62 changes: 54 additions & 8 deletions crates/fuel-core/src/state/historical_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use std::{
},
},
Arc,
Mutex,
},
};

Expand All @@ -96,7 +97,9 @@ pub struct HistoricalRocksDB<Description> {
/// The [`StateRewindPolicy`] used by the historical rocksdb
state_rewind_policy: StateRewindPolicy,
/// The Description of the database.
db: RocksDb<Historical<Description>>,
db: Arc<RocksDb<Historical<Description>>>,
/// Changes that have been committed in memory, but not to rocksdb
in_memory_changes: Mutex<StorageTransaction<Arc<RocksDb<Historical<Description>>>>>,
/// Indicates whether the [`Column::ModificationsHistory`]
/// is being migrated to [`Column::ModificationsHistoryV2`]
// TODO: Change this to an Atomic variable. (Release for Writing, Acquire for reading)
Expand All @@ -111,9 +114,16 @@ where
db: RocksDb<Historical<Description>>,
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
let db = Arc::new(db);
let db_ref = db.clone();
Ok(Self {
state_rewind_policy,
db,
in_memory_changes: Mutex::new(StorageTransaction::transaction(
db_ref,
ConflictPolicy::Overwrite,
Changes::default(),
)),
modifications_history_migration_in_progress: Arc::new(AtomicBool::new(true)),
})
}
Expand All @@ -123,10 +133,18 @@ where
capacity: Option<usize>,
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
let db = RocksDb::<Historical<Description>>::default_open(path, capacity)?;
let db = Arc::new(RocksDb::<Historical<Description>>::default_open(
path, capacity,
)?);
let db_ref = db.clone();
Ok(Self {
state_rewind_policy,
db,
in_memory_changes: Mutex::new(StorageTransaction::transaction(
db_ref,
ConflictPolicy::Overwrite,
Changes::default(),
)),
modifications_history_migration_in_progress: Arc::new(AtomicBool::new(true)),
})
}
Expand Down Expand Up @@ -350,7 +368,7 @@ where
}

fn rollback_block_to(&self, height_to_rollback: u64) -> StorageResult<()> {
let mut storage_transaction = self.db.read_transaction();
let mut storage_transaction = (*self.db).read_transaction();

let modifications_history_migration_in_progress = self
.modifications_history_migration_in_progress
Expand Down Expand Up @@ -580,7 +598,7 @@ where
}
}

impl<Description> IterableStore for HistoricalRocksDB<Description>
impl<'a, Description> IterableStore for HistoricalRocksDB<Description>
where
Description: DatabaseDescription,
{
Expand Down Expand Up @@ -617,14 +635,42 @@ where
height: Option<Description::Height>,
changes: Changes,
) -> StorageResult<()> {
let mut storage_transaction =
StorageTransaction::transaction(&self.db, ConflictPolicy::Overwrite, changes);
let mut storage_transaction = StorageTransaction::transaction(
&*self.db,
ConflictPolicy::Overwrite,
changes,
);
if let Some(height) = height {
self.store_modifications_history(&mut storage_transaction, &height)?;
}

self.db
.commit_changes(&storage_transaction.into_changes())?;
// Commit into the in_memory_db that also contains changes from the migration
let mut cumulative_tx = self
.in_memory_changes
.lock()
.map_err(|poisoned| {
StorageError::Other(anyhow::anyhow!(
"Tried to acquire lock to in-memory changes but it was poisoned: {poisoned:?}"
))
})?;

// The type of cumulative_tx is InMemoryTransaction<Arc<RocksDb<Historical<_>>>>,
// which does not implement the Modifiable trait. Instead, we create a new
// transaction of type InMemoryTransaction<&RocksDb<Historical<_>>>,
// anc dommut both the current and cumulative changes to it.

let cumulative_changes = cumulative_tx.changes().clone();

let final_transaction = StorageTransaction::transaction(
storage_transaction,
ConflictPolicy::Overwrite,
cumulative_changes,
)
.commit()?;

cumulative_tx.reset_changes();

self.db.commit_changes(&final_transaction.into_changes())?;
Ok(())
}

Expand Down

0 comments on commit 4c42133

Please sign in to comment.