Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
acerone85 committed Oct 1, 2024
1 parent 4c42133 commit 0ff920e
Showing 1 changed file with 95 additions and 71 deletions.
166 changes: 95 additions & 71 deletions crates/fuel-core/src/state/historical_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use fuel_core_storage::{
transactional::{
Changes,
ConflictPolicy,
ReadTransaction,
StorageTransaction,
},
Error as StorageError,
Expand Down Expand Up @@ -97,9 +96,9 @@ pub struct HistoricalRocksDB<Description> {
/// The [`StateRewindPolicy`] used by the historical rocksdb
state_rewind_policy: StateRewindPolicy,
/// The Description of the database.
db: Arc<RocksDb<Historical<Description>>>,
db: RocksDb<Historical<Description>>,
/// Changes that have been committed in memory, but not to rocksdb
in_memory_changes: Mutex<StorageTransaction<Arc<RocksDb<Historical<Description>>>>>,
migration_changes: Mutex<Changes>,
/// Indicates whether the [`Column::ModificationsHistory`]
/// is being migrated to [`Column::ModificationsHistoryV2`]
// TODO: Change this to an Atomic variable. (Release for Writing, Acquire for reading)
Expand All @@ -114,16 +113,10 @@ where
db: RocksDb<Historical<Description>>,
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
let db = Arc::new(db);
let db_ref = db.clone();
Ok(Self {
state_rewind_policy,
db,
in_memory_changes: Mutex::new(StorageTransaction::transaction(
db_ref,
ConflictPolicy::Overwrite,
Changes::default(),
)),
migration_changes: Mutex::new(Changes::default()),
modifications_history_migration_in_progress: Arc::new(AtomicBool::new(true)),
})
}
Expand All @@ -133,18 +126,11 @@ where
capacity: Option<usize>,
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
let db = Arc::new(RocksDb::<Historical<Description>>::default_open(
path, capacity,
)?);
let db_ref = db.clone();
let db = RocksDb::<Historical<Description>>::default_open(path, capacity)?;
Ok(Self {
state_rewind_policy,
db,
in_memory_changes: Mutex::new(StorageTransaction::transaction(
db_ref,
ConflictPolicy::Overwrite,
Changes::default(),
)),
migration_changes: Mutex::new(Changes::default()),
modifications_history_migration_in_progress: Arc::new(AtomicBool::new(true)),
})
}
Expand Down Expand Up @@ -221,11 +207,14 @@ where
Ok(ViewAtHeight::new(rollback_height, latest_view))
}

fn store_modifications_history(
fn store_modifications_history<T>(
&self,
storage_transaction: &mut StorageTransaction<&RocksDb<Historical<Description>>>,
storage_transaction: &mut StorageTransaction<T>,
height: &Description::Height,
) -> StorageResult<()> {
) -> StorageResult<()>
where
T: KeyValueInspect<Column = Column<Description>>,
{
let modifications_history_migration_in_progress = self
.modifications_history_migration_in_progress
.load(Acquire);
Expand Down Expand Up @@ -368,7 +357,24 @@ where
}

fn rollback_block_to(&self, height_to_rollback: u64) -> StorageResult<()> {
let mut storage_transaction = (*self.db).read_transaction();
let mut migration_changes_lock_guard =
self.migration_changes.lock().map_err(|poisoned| {
StorageError::Other(anyhow::anyhow!("Lock was poisoned: {poisoned:?}"))
})?;

let migration_changes = std::mem::take(&mut *migration_changes_lock_guard);

let mut migration_transaction = StorageTransaction::transaction(
&self.db,
ConflictPolicy::Overwrite,
migration_changes,
);

let mut storage_transaction = StorageTransaction::transaction(
&mut migration_transaction,
ConflictPolicy::Overwrite,
Changes::default(),
);

let modifications_history_migration_in_progress = self
.modifications_history_migration_in_progress
Expand All @@ -394,23 +400,39 @@ where
)
.commit()?;

storage_transaction.commit()?;

self.db
.commit_changes(&storage_transaction.into_changes())?;
.commit_changes(&migration_transaction.into_changes())?;

Ok(())
}

/// Migrates a ModificationHistory key-value pair from V1 to V2.
/// The migration fails if a concurrent transaction which rollbacks
/// to the `height` being migrated commits before this
/// migration transaction commits.
/// The migration is lazy, in that the changes necessary to perform
/// the mivration from ModificationHistoryV1 to ModificationHistoryV2
/// are simply recorded, and will be flushed to the database when it
/// commits or rollbacks to a new height.
pub fn migrate_modifications_history_at_height(
&self,
height: u64,
) -> StorageResult<()> {
let mut migration_transaction = StorageTransaction::transaction(
let mut changes_guard = self.migration_changes.lock().map_err(|poisoned| {
StorageError::Other(anyhow::anyhow!("Lock is poisoned: {poisoned:?}"))
})?;

let current_changes = std::mem::take(&mut *changes_guard);

let mut cumulative_tx = StorageTransaction::transaction(
&self.db,
ConflictPolicy::Fail,
ConflictPolicy::Overwrite,
current_changes,
);

// TODO: I could add a second migration transaction and commit the second transaction into the first
let mut migration_transaction = StorageTransaction::transaction(
&mut cumulative_tx,
ConflictPolicy::Overwrite,
Changes::default(),
);

Expand All @@ -420,14 +442,16 @@ where
if let Some(v1_changes) = v1_changes {
migration_transaction
.storage_as_mut::<ModificationsHistoryV2<Description>>()
.insert(&height, &v1_changes)?;
.insert(&height, &v1_changes)?; // TODO: If this is an error we want to at least write back the old changes
};

self.db
.commit_changes(&migration_transaction.into_changes())?;
Ok(())
} else {
Ok(())
}
// Add the changes into the cumulative changes
migration_transaction.commit()?;

let cumulative_changes = cumulative_tx.into_changes();
*changes_guard = cumulative_changes;

Ok(())
}

pub fn v1_entries(&self) -> BoxedIter<StorageResult<(u64, Changes)>> {
Expand All @@ -447,13 +471,14 @@ where
// `ModificationsHistoryV1` and return it if no value for `ModificationsHistoryV2`
// was found. This is necessary to avoid scenarios where it is possible to
// roll back twice to the same block height
fn multiversion_take<Description>(
storage_transaction: &mut StorageTransaction<&RocksDb<Historical<Description>>>,
fn multiversion_take<Description, T>(
storage_transaction: &mut StorageTransaction<T>,
height: u64,
modifications_history_migration_in_progress: bool,
) -> StorageResult<Option<Changes>>
where
Description: DatabaseDescription,
T: KeyValueInspect<Column = Column<Description>>,
{
// This will cause the V2 key to be removed in case the storage transaction snapshot
// a conflicting transaction writes a value for it, but that update is not reflected
Expand All @@ -472,14 +497,15 @@ where
}
}

fn multiversion_replace<Description>(
storage_transaction: &mut StorageTransaction<&RocksDb<Historical<Description>>>,
fn multiversion_replace<Description, T>(
storage_transaction: &mut StorageTransaction<T>,
height: u64,
changes: &Changes,
modifications_history_migration_in_progress: bool,
) -> StorageResult<Option<Changes>>
where
Description: DatabaseDescription,
T: KeyValueInspect<Column = Column<Description>>,
{
let v2_last_changes = storage_transaction
.storage_as_mut::<ModificationsHistoryV2<Description>>()
Expand All @@ -495,14 +521,15 @@ where
}
}

fn cleanup_old_changes<Description>(
fn cleanup_old_changes<Description, T>(
height: &u64,
storage_transaction: &mut StorageTransaction<&RocksDb<Historical<Description>>>,
storage_transaction: &mut StorageTransaction<T>,
state_rewind_policy: &StateRewindPolicy,
modifications_history_migration_in_progress: bool,
) -> StorageResult<()>
where
Description: DatabaseDescription,
T: KeyValueInspect<Column = Column<Description>>,
{
match state_rewind_policy {
StateRewindPolicy::NoRewind => {
Expand Down Expand Up @@ -532,13 +559,14 @@ where
Ok(())
}

fn remove_historical_modifications<Description>(
fn remove_historical_modifications<Description, T>(
old_height: &u64,
storage_transaction: &mut StorageTransaction<&RocksDb<Historical<Description>>>,
storage_transaction: &mut StorageTransaction<T>,
reverse_changes: &Changes,
) -> StorageResult<()>
where
Description: DatabaseDescription,
T: KeyValueInspect<Column = Column<Description>>,
{
let changes = reverse_changes
.iter()
Expand Down Expand Up @@ -635,42 +663,38 @@ where
height: Option<Description::Height>,
changes: Changes,
) -> StorageResult<()> {
let mut migration_changes_lock_guard = self.migration_changes
.lock()
.map_err(|poisoned| {
StorageError::Other(anyhow::anyhow!(
"Tried to acquire lock to in-memory changes but it was poisoned: {poisoned:?}"
))
})?;
let migration_changes = std::mem::take(&mut *migration_changes_lock_guard);
// The type of cumulative_tx is InMemoryTransaction<Arc<RocksDb<Historical<_>>>>,
// which does not implement the Modifiable trait. Instead, we create a new
// transaction of type InMemoryTransaction<&RocksDb<Historical<_>>>,
// anc c0 both the current and cumulative changes to it.

Check warning on line 677 in crates/fuel-core/src/state/historical_rocksdb.rs

View workflow job for this annotation

GitHub Actions / find-typos

"anc" should be "and".

let mut migration_transaction = StorageTransaction::transaction(
&self.db,
ConflictPolicy::Overwrite,
migration_changes,
);
let mut storage_transaction = StorageTransaction::transaction(
&*self.db,
&mut migration_transaction,
ConflictPolicy::Overwrite,
changes,
);

if let Some(height) = height {
self.store_modifications_history(&mut storage_transaction, &height)?;
}

// Commit into the in_memory_db that also contains changes from the migration
let mut cumulative_tx = self
.in_memory_changes
.lock()
.map_err(|poisoned| {
StorageError::Other(anyhow::anyhow!(
"Tried to acquire lock to in-memory changes but it was poisoned: {poisoned:?}"
))
})?;
storage_transaction.commit()?; // TODO: Handle error

// The type of cumulative_tx is InMemoryTransaction<Arc<RocksDb<Historical<_>>>>,
// which does not implement the Modifiable trait. Instead, we create a new
// transaction of type InMemoryTransaction<&RocksDb<Historical<_>>>,
// anc dommut both the current and cumulative changes to it.

let cumulative_changes = cumulative_tx.changes().clone();

let final_transaction = StorageTransaction::transaction(
storage_transaction,
ConflictPolicy::Overwrite,
cumulative_changes,
)
.commit()?;

cumulative_tx.reset_changes();

self.db.commit_changes(&final_transaction.into_changes())?;
self.db
.commit_changes(&migration_transaction.into_changes())?;
Ok(())
}

Expand Down

0 comments on commit 0ff920e

Please sign in to comment.