diff --git a/crates/fuel-core/src/state/historical_rocksdb.rs b/crates/fuel-core/src/state/historical_rocksdb.rs index e3abf055438..18be7d13bf8 100644 --- a/crates/fuel-core/src/state/historical_rocksdb.rs +++ b/crates/fuel-core/src/state/historical_rocksdb.rs @@ -42,7 +42,6 @@ use fuel_core_storage::{ transactional::{ Changes, ConflictPolicy, - ReadTransaction, StorageTransaction, }, Error as StorageError, @@ -97,9 +96,9 @@ pub struct HistoricalRocksDB { /// The [`StateRewindPolicy`] used by the historical rocksdb state_rewind_policy: StateRewindPolicy, /// The Description of the database. - db: Arc>>, + db: RocksDb>, /// Changes that have been committed in memory, but not to rocksdb - in_memory_changes: Mutex>>>>, + migration_changes: Mutex, /// Indicates whether the [`Column::ModificationsHistory`] /// is being migrated to [`Column::ModificationsHistoryV2`] // TODO: Change this to an Atomic variable. (Release for Writing, Acquire for reading) @@ -114,16 +113,10 @@ where db: RocksDb>, state_rewind_policy: StateRewindPolicy, ) -> DatabaseResult { - let db = Arc::new(db); - let db_ref = db.clone(); Ok(Self { state_rewind_policy, db, - in_memory_changes: Mutex::new(StorageTransaction::transaction( - db_ref, - ConflictPolicy::Overwrite, - Changes::default(), - )), + migration_changes: Mutex::new(Changes::default()), modifications_history_migration_in_progress: Arc::new(AtomicBool::new(true)), }) } @@ -133,18 +126,11 @@ where capacity: Option, state_rewind_policy: StateRewindPolicy, ) -> DatabaseResult { - let db = Arc::new(RocksDb::>::default_open( - path, capacity, - )?); - let db_ref = db.clone(); + let db = RocksDb::>::default_open(path, capacity)?; Ok(Self { state_rewind_policy, db, - in_memory_changes: Mutex::new(StorageTransaction::transaction( - db_ref, - ConflictPolicy::Overwrite, - Changes::default(), - )), + migration_changes: Mutex::new(Changes::default()), modifications_history_migration_in_progress: Arc::new(AtomicBool::new(true)), }) } @@ -221,11 +207,14 @@ where Ok(ViewAtHeight::new(rollback_height, latest_view)) } - fn store_modifications_history( + fn store_modifications_history( &self, - storage_transaction: &mut StorageTransaction<&RocksDb>>, + storage_transaction: &mut StorageTransaction, height: &Description::Height, - ) -> StorageResult<()> { + ) -> StorageResult<()> + where + T: KeyValueInspect>, + { let modifications_history_migration_in_progress = self .modifications_history_migration_in_progress .load(Acquire); @@ -368,7 +357,24 @@ where } fn rollback_block_to(&self, height_to_rollback: u64) -> StorageResult<()> { - let mut storage_transaction = (*self.db).read_transaction(); + let mut migration_changes_lock_guard = + self.migration_changes.lock().map_err(|poisoned| { + StorageError::Other(anyhow::anyhow!("Lock was poisoned: {poisoned:?}")) + })?; + + let migration_changes = std::mem::take(&mut *migration_changes_lock_guard); + + let mut migration_transaction = StorageTransaction::transaction( + &self.db, + ConflictPolicy::Overwrite, + migration_changes, + ); + + let mut storage_transaction = StorageTransaction::transaction( + &mut migration_transaction, + ConflictPolicy::Overwrite, + Changes::default(), + ); let modifications_history_migration_in_progress = self .modifications_history_migration_in_progress @@ -394,23 +400,39 @@ where ) .commit()?; + storage_transaction.commit()?; + self.db - .commit_changes(&storage_transaction.into_changes())?; + .commit_changes(&migration_transaction.into_changes())?; Ok(()) } /// Migrates a ModificationHistory key-value pair from V1 to V2. - /// The migration fails if a concurrent transaction which rollbacks - /// to the `height` being migrated commits before this - /// migration transaction commits. + /// The migration is lazy, in that the changes necessary to perform + /// the mivration from ModificationHistoryV1 to ModificationHistoryV2 + /// are simply recorded, and will be flushed to the database when it + /// commits or rollbacks to a new height. pub fn migrate_modifications_history_at_height( &self, height: u64, ) -> StorageResult<()> { - let mut migration_transaction = StorageTransaction::transaction( + let mut changes_guard = self.migration_changes.lock().map_err(|poisoned| { + StorageError::Other(anyhow::anyhow!("Lock is poisoned: {poisoned:?}")) + })?; + + let current_changes = std::mem::take(&mut *changes_guard); + + let mut cumulative_tx = StorageTransaction::transaction( &self.db, - ConflictPolicy::Fail, + ConflictPolicy::Overwrite, + current_changes, + ); + + // TODO: I could add a second migration transaction and commit the second transaction into the first + let mut migration_transaction = StorageTransaction::transaction( + &mut cumulative_tx, + ConflictPolicy::Overwrite, Changes::default(), ); @@ -420,14 +442,16 @@ where if let Some(v1_changes) = v1_changes { migration_transaction .storage_as_mut::>() - .insert(&height, &v1_changes)?; + .insert(&height, &v1_changes)?; // TODO: If this is an error we want to at least write back the old changes + }; - self.db - .commit_changes(&migration_transaction.into_changes())?; - Ok(()) - } else { - Ok(()) - } + // Add the changes into the cumulative changes + migration_transaction.commit()?; + + let cumulative_changes = cumulative_tx.into_changes(); + *changes_guard = cumulative_changes; + + Ok(()) } pub fn v1_entries(&self) -> BoxedIter> { @@ -447,13 +471,14 @@ where // `ModificationsHistoryV1` and return it if no value for `ModificationsHistoryV2` // was found. This is necessary to avoid scenarios where it is possible to // roll back twice to the same block height -fn multiversion_take( - storage_transaction: &mut StorageTransaction<&RocksDb>>, +fn multiversion_take( + storage_transaction: &mut StorageTransaction, height: u64, modifications_history_migration_in_progress: bool, ) -> StorageResult> where Description: DatabaseDescription, + T: KeyValueInspect>, { // This will cause the V2 key to be removed in case the storage transaction snapshot // a conflicting transaction writes a value for it, but that update is not reflected @@ -472,14 +497,15 @@ where } } -fn multiversion_replace( - storage_transaction: &mut StorageTransaction<&RocksDb>>, +fn multiversion_replace( + storage_transaction: &mut StorageTransaction, height: u64, changes: &Changes, modifications_history_migration_in_progress: bool, ) -> StorageResult> where Description: DatabaseDescription, + T: KeyValueInspect>, { let v2_last_changes = storage_transaction .storage_as_mut::>() @@ -495,14 +521,15 @@ where } } -fn cleanup_old_changes( +fn cleanup_old_changes( height: &u64, - storage_transaction: &mut StorageTransaction<&RocksDb>>, + storage_transaction: &mut StorageTransaction, state_rewind_policy: &StateRewindPolicy, modifications_history_migration_in_progress: bool, ) -> StorageResult<()> where Description: DatabaseDescription, + T: KeyValueInspect>, { match state_rewind_policy { StateRewindPolicy::NoRewind => { @@ -532,13 +559,14 @@ where Ok(()) } -fn remove_historical_modifications( +fn remove_historical_modifications( old_height: &u64, - storage_transaction: &mut StorageTransaction<&RocksDb>>, + storage_transaction: &mut StorageTransaction, reverse_changes: &Changes, ) -> StorageResult<()> where Description: DatabaseDescription, + T: KeyValueInspect>, { let changes = reverse_changes .iter() @@ -635,42 +663,38 @@ where height: Option, changes: Changes, ) -> StorageResult<()> { + let mut migration_changes_lock_guard = self.migration_changes + .lock() + .map_err(|poisoned| { + StorageError::Other(anyhow::anyhow!( + "Tried to acquire lock to in-memory changes but it was poisoned: {poisoned:?}" + )) + })?; + let migration_changes = std::mem::take(&mut *migration_changes_lock_guard); + // The type of cumulative_tx is InMemoryTransaction>>>, + // which does not implement the Modifiable trait. Instead, we create a new + // transaction of type InMemoryTransaction<&RocksDb>>, + // anc c0 both the current and cumulative changes to it. + + let mut migration_transaction = StorageTransaction::transaction( + &self.db, + ConflictPolicy::Overwrite, + migration_changes, + ); let mut storage_transaction = StorageTransaction::transaction( - &*self.db, + &mut migration_transaction, ConflictPolicy::Overwrite, changes, ); + if let Some(height) = height { self.store_modifications_history(&mut storage_transaction, &height)?; } - // Commit into the in_memory_db that also contains changes from the migration - let mut cumulative_tx = self - .in_memory_changes - .lock() - .map_err(|poisoned| { - StorageError::Other(anyhow::anyhow!( - "Tried to acquire lock to in-memory changes but it was poisoned: {poisoned:?}" - )) - })?; + storage_transaction.commit()?; // TODO: Handle error - // The type of cumulative_tx is InMemoryTransaction>>>, - // which does not implement the Modifiable trait. Instead, we create a new - // transaction of type InMemoryTransaction<&RocksDb>>, - // anc dommut both the current and cumulative changes to it. - - let cumulative_changes = cumulative_tx.changes().clone(); - - let final_transaction = StorageTransaction::transaction( - storage_transaction, - ConflictPolicy::Overwrite, - cumulative_changes, - ) - .commit()?; - - cumulative_tx.reset_changes(); - - self.db.commit_changes(&final_transaction.into_changes())?; + self.db + .commit_changes(&migration_transaction.into_changes())?; Ok(()) }