-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(cat-gateway): Purge volatile data after immutable roll forward (#…
…1188) * wip(cat-gateway): add select and delete queries for purging volatile data * wip(cat-gateway): refactor purge queries * feat(cat-gateway): refactor session execution code for reuse * feat(cat-gateway): add purge_queries to CassandraSession * fix(cat-gateway): refactor into queries::purge module * wip(cat-gateway): add purge methods to cassandra session * wip(cat-gateway): rust code for txo by stake queries * wip(cat-gateway): refactor queries for purging volatile data * wip(cat-gateway): refactor queries for purging volatile data * fix(cat-gateway): modify types for query result * fix(cat-gateway): cleanup naming types * chore(cat-gateway): fix spelling * chore(cat-gateway): fix spelling * wip(cat-gateway): rust code for txo assets by stake queries * fix(cat-gateway): cleanup field names * fix(cat-gateway): incorporate unstaked txo queries, cleanup cql comments * fix(cat-gateway): incorporate unstaked txi queries, cleanup cql comments * fix(cat-gateway): incorporate stake registration queries, cleanup cql comments * fix(cat-gateway): incorporate cip36 registration queries * fix(cat-gateway): incorporate cip36 registration for votekey queries * chore(cat-gateway): fix spelling * fix(cat-gateway): incorporate cip36 registration for votekey queries * fix(cat-gateway): incorporate cip36 invalid registration queries * fix(cat-gateway): incorporate rbac509 registration queries * fix(cat-gateway): incorporate chain root for role0 registration queries * chore(cat-gateway): fix CQL comments * fix(cat-gateway): incorporate chain root for stake addr and txn id registration queries * fix(cat-gateway): box large futures lint * feat(cat-gateway): add PURGE_SLOT_BUFFER parameter * fix(cat-gateway): update scylla type usage * fix(cat-gateway): fix prepared query variants * wip(cat-gateway): add purge command when only one chain follower is left * feat(cat-gateway): purge roll forward volatile data * fix(cat-gateway): correct names for chain root queries * feat(cat-gateway): wrap up queries to purge volatile data * fix(cat-gateway): cleanup * fix(cat-gateway): update cardano-chain-follower to v0.0.6 * test(rm schema check): temp * revert schema_version_check * revert `/health/live` endpoint * fix purge queris * remain comment * fix --------- Co-authored-by: Steven Johnson <[email protected]> Co-authored-by: cong-or <[email protected]> Co-authored-by: Mr-Leshiy <[email protected]>
- Loading branch information
1 parent
6358f6e
commit 5c875d3
Showing
50 changed files
with
2,640 additions
and
49 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
314 changes: 314 additions & 0 deletions
314
catalyst-gateway/bin/src/db/index/block/roll_forward.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,314 @@ | ||
//! Immutable Roll Forward logic. | ||
use std::{collections::HashSet, sync::Arc}; | ||
|
||
use futures::StreamExt; | ||
|
||
use crate::{ | ||
db::index::{block::CassandraSession, queries::purge}, | ||
settings::Settings, | ||
}; | ||
|
||
/// Purge Data from Live Index | ||
pub(crate) async fn purge_live_index(purge_slot: u64) -> anyhow::Result<()> { | ||
let persistent = false; // get volatile session | ||
let Some(session) = CassandraSession::get(persistent) else { | ||
anyhow::bail!("Failed to acquire db session"); | ||
}; | ||
|
||
// Purge data up to this slot | ||
let purge_to_slot: num_bigint::BigInt = purge_slot | ||
.saturating_sub(Settings::purge_slot_buffer()) | ||
.into(); | ||
|
||
let txn_hashes = purge_txi_by_hash(&session, &purge_to_slot).await?; | ||
purge_chain_root_for_role0_key(&session, &purge_to_slot).await?; | ||
purge_chain_root_for_stake_address(&session, &purge_to_slot).await?; | ||
purge_chain_root_for_txn_id(&session, &txn_hashes).await?; | ||
purge_cip36_registration(&session, &purge_to_slot).await?; | ||
purge_cip36_registration_for_vote_key(&session, &purge_to_slot).await?; | ||
purge_cip36_registration_invalid(&session, &purge_to_slot).await?; | ||
purge_rbac509_registration(&session, &purge_to_slot).await?; | ||
purge_stake_registration(&session, &purge_to_slot).await?; | ||
purge_txo_ada(&session, &purge_to_slot).await?; | ||
purge_txo_assets(&session, &purge_to_slot).await?; | ||
purge_unstaked_txo_ada(&session, &purge_to_slot).await?; | ||
purge_unstaked_txo_assets(&session, &purge_to_slot).await?; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Purge data from `chain_root_for_role0_key`. | ||
async fn purge_chain_root_for_role0_key( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::chain_root_for_role0_key::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if ¶ms.slot_no <= purge_to_slot { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `chain_root_for_stake_address`. | ||
async fn purge_chain_root_for_stake_address( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::chain_root_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if ¶ms.slot_no <= purge_to_slot { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `chain_root_for_txn_id`. | ||
async fn purge_chain_root_for_txn_id( | ||
session: &Arc<CassandraSession>, txn_hashes: &HashSet<Vec<u8>>, | ||
) -> anyhow::Result<()> { | ||
use purge::chain_root_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if txn_hashes.contains(¶ms.transaction_id) { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `cip36_registration`. | ||
async fn purge_cip36_registration( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if ¶ms.slot_no <= purge_to_slot { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `cip36_registration_for_vote_key`. | ||
async fn purge_cip36_registration_for_vote_key( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if ¶ms.slot_no <= purge_to_slot { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `cip36_registration_invalid`. | ||
async fn purge_cip36_registration_invalid( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if ¶ms.slot_no <= purge_to_slot { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `rbac509_registration`. | ||
async fn purge_rbac509_registration( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if ¶ms.slot_no <= purge_to_slot { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `stake_registration`. | ||
async fn purge_stake_registration( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if ¶ms.slot_no <= purge_to_slot { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `txi_by_hash`. | ||
async fn purge_txi_by_hash( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<HashSet<Vec<u8>>> { | ||
use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
let mut txn_hashes: HashSet<Vec<u8>> = HashSet::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
if &primary_key.2 <= purge_to_slot { | ||
let params: Params = primary_key.into(); | ||
txn_hashes.insert(params.txn_hash.clone()); | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(txn_hashes) | ||
} | ||
|
||
/// Purge data from `txo_ada`. | ||
async fn purge_txo_ada( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if ¶ms.slot_no <= purge_to_slot { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `txo_assets`. | ||
async fn purge_txo_assets( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if ¶ms.slot_no <= purge_to_slot { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `unstaked_txo_ada`. | ||
async fn purge_unstaked_txo_ada( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
if &primary_key.2 <= purge_to_slot { | ||
let params: Params = primary_key.into(); | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Purge data from `unstaked_txo_assets`. | ||
async fn purge_unstaked_txo_assets( | ||
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt, | ||
) -> anyhow::Result<()> { | ||
use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery}; | ||
|
||
// Get all keys | ||
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; | ||
// Filter | ||
let mut delete_params: Vec<Params> = Vec::new(); | ||
while let Some(Ok(primary_key)) = primary_keys_stream.next().await { | ||
let params: Params = primary_key.into(); | ||
if ¶ms.slot_no <= purge_to_slot { | ||
delete_params.push(params); | ||
} | ||
} | ||
// Delete filtered keys | ||
DeleteQuery::execute(session, delete_params).await?; | ||
Ok(()) | ||
} |
Oops, something went wrong.