diff --git a/crates/fuel-core/src/state/historical_rocksdb.rs b/crates/fuel-core/src/state/historical_rocksdb.rs index fdb1ab1da42..e3abf055438 100644 --- a/crates/fuel-core/src/state/historical_rocksdb.rs +++ b/crates/fuel-core/src/state/historical_rocksdb.rs @@ -70,6 +70,7 @@ use std::{ }, }, Arc, + Mutex, }, }; @@ -96,7 +97,9 @@ pub struct HistoricalRocksDB { /// The [`StateRewindPolicy`] used by the historical rocksdb state_rewind_policy: StateRewindPolicy, /// The Description of the database. - db: RocksDb>, + db: Arc>>, + /// Changes that have been committed in memory, but not to rocksdb + in_memory_changes: Mutex>>>>, /// Indicates whether the [`Column::ModificationsHistory`] /// is being migrated to [`Column::ModificationsHistoryV2`] // TODO: Change this to an Atomic variable. (Release for Writing, Acquire for reading) @@ -111,9 +114,16 @@ where db: RocksDb>, state_rewind_policy: StateRewindPolicy, ) -> DatabaseResult { + let db = Arc::new(db); + let db_ref = db.clone(); Ok(Self { state_rewind_policy, db, + in_memory_changes: Mutex::new(StorageTransaction::transaction( + db_ref, + ConflictPolicy::Overwrite, + Changes::default(), + )), modifications_history_migration_in_progress: Arc::new(AtomicBool::new(true)), }) } @@ -123,10 +133,18 @@ where capacity: Option, state_rewind_policy: StateRewindPolicy, ) -> DatabaseResult { - let db = RocksDb::>::default_open(path, capacity)?; + let db = Arc::new(RocksDb::>::default_open( + path, capacity, + )?); + let db_ref = db.clone(); Ok(Self { state_rewind_policy, db, + in_memory_changes: Mutex::new(StorageTransaction::transaction( + db_ref, + ConflictPolicy::Overwrite, + Changes::default(), + )), modifications_history_migration_in_progress: Arc::new(AtomicBool::new(true)), }) } @@ -350,7 +368,7 @@ where } fn rollback_block_to(&self, height_to_rollback: u64) -> StorageResult<()> { - let mut storage_transaction = self.db.read_transaction(); + let mut storage_transaction = (*self.db).read_transaction(); let modifications_history_migration_in_progress = self .modifications_history_migration_in_progress @@ -580,7 +598,7 @@ where } } -impl IterableStore for HistoricalRocksDB +impl<'a, Description> IterableStore for HistoricalRocksDB where Description: DatabaseDescription, { @@ -617,14 +635,42 @@ where height: Option, changes: Changes, ) -> StorageResult<()> { - let mut storage_transaction = - StorageTransaction::transaction(&self.db, ConflictPolicy::Overwrite, changes); + let mut storage_transaction = StorageTransaction::transaction( + &*self.db, + ConflictPolicy::Overwrite, + changes, + ); if let Some(height) = height { self.store_modifications_history(&mut storage_transaction, &height)?; } - self.db - .commit_changes(&storage_transaction.into_changes())?; + // Commit into the in_memory_db that also contains changes from the migration + let mut cumulative_tx = self + .in_memory_changes + .lock() + .map_err(|poisoned| { + StorageError::Other(anyhow::anyhow!( + "Tried to acquire lock to in-memory changes but it was poisoned: {poisoned:?}" + )) + })?; + + // The type of cumulative_tx is InMemoryTransaction>>>, + // which does not implement the Modifiable trait. Instead, we create a new + // transaction of type InMemoryTransaction<&RocksDb>>, + // anc dommut both the current and cumulative changes to it. + + let cumulative_changes = cumulative_tx.changes().clone(); + + let final_transaction = StorageTransaction::transaction( + storage_transaction, + ConflictPolicy::Overwrite, + cumulative_changes, + ) + .commit()?; + + cumulative_tx.reset_changes(); + + self.db.commit_changes(&final_transaction.into_changes())?; Ok(()) }