diff --git a/catalyst-gateway/bin/src/cardano/mod.rs b/catalyst-gateway/bin/src/cardano/mod.rs index 10d32c98397..e93fc609005 100644 --- a/catalyst-gateway/bin/src/cardano/mod.rs +++ b/catalyst-gateway/bin/src/cardano/mod.rs @@ -242,7 +242,7 @@ fn sync_subchain(params: SyncParams) -> tokio::task::JoinHandle { params.backoff().await; // Wait for indexing DB to be ready before continuing. - CassandraSession::wait_is_ready(INDEXING_DB_READY_WAIT_INTERVAL).await; + drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await); info!(chain=%params.chain, params=%params,"Indexing DB is ready"); let mut first_indexed_block = params.first_indexed_block.clone(); @@ -377,7 +377,7 @@ impl SyncTask { // Wait for indexing DB to be ready before continuing. // We do this after the above, because other nodes may have finished already, and we don't // want to wait do any work they already completed while we were fetching the blockchain. - CassandraSession::wait_is_ready(INDEXING_DB_READY_WAIT_INTERVAL).await; + drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await); info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state"); self.sync_status = get_sync_status().await; debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status); diff --git a/catalyst-gateway/bin/src/db/index/block/certs.rs b/catalyst-gateway/bin/src/db/index/block/certs.rs index 72b2d8165ae..4bc0d963a35 100644 --- a/catalyst-gateway/bin/src/db/index/block/certs.rs +++ b/catalyst-gateway/bin/src/db/index/block/certs.rs @@ -111,7 +111,7 @@ impl StakeRegistrationInsertQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_STAKE_REGISTRATION_QUERY, cfg, @@ -119,13 +119,11 @@ impl StakeRegistrationInsertQuery { true, false, ) - .await; - - if let Err(ref error) = insert_queries { - error!(error=%error,"Failed to prepare Insert Stake Registration Query."); - }; - - insert_queries + .await + .inspect_err( + |error| error!(error=%error,"Failed to prepare Insert Stake Registration Query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_STAKE_REGISTRATION_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36.rs b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36.rs index d0a94fe7d5a..d6ba6fde5d6 100644 --- a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36.rs +++ b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36.rs @@ -16,7 +16,7 @@ const INSERT_CIP36_REGISTRATION_QUERY: &str = include_str!("./cql/insert_cip36.c /// Insert CIP-36 Registration Query Parameters #[derive(SerializeRow, Clone)] -pub(super) struct Params { +pub(crate) struct Params { /// Full Stake Address (not hashed, 32 byte ED25519 Public key). stake_address: Vec, /// Nonce value after normalization. @@ -81,10 +81,10 @@ impl Params { } /// Prepare Batch of Insert CIP-36 Registration Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_CIP36_REGISTRATION_QUERY, cfg, @@ -92,12 +92,10 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = insert_queries { - error!(error=%error,"Failed to prepare Insert CIP-36 Registration Query."); - }; - - insert_queries + .await + .inspect_err( + |error| error!(error=%error,"Failed to prepare Insert CIP-36 Registration Query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_CIP36_REGISTRATION_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_for_vote_key.rs b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_for_vote_key.rs index e990443e309..a4239dced66 100644 --- a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_for_vote_key.rs +++ b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_for_vote_key.rs @@ -17,7 +17,7 @@ const INSERT_CIP36_REGISTRATION_FOR_VOTE_KEY_QUERY: &str = /// Insert CIP-36 Registration Invalid Query Parameters #[derive(SerializeRow, Debug)] -pub(super) struct Params { +pub(crate) struct Params { /// Voting Public Key vote_key: Vec, /// Full Stake Address (not hashed, 32 byte ED25519 Public key). @@ -48,10 +48,10 @@ impl Params { } /// Prepare Batch of Insert CIP-36 Registration Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_CIP36_REGISTRATION_FOR_VOTE_KEY_QUERY, cfg, @@ -59,12 +59,12 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = insert_queries { - error!(error=%error,"Failed to prepare Insert CIP-36 Registration Query."); - }; - - insert_queries + .await + .inspect_err( + |error| error!(error=%error,"Failed to prepare Insert CIP-36 Registration Query."), + ) + .map_err(|error| { + anyhow::anyhow!("{error}\n--\n{INSERT_CIP36_REGISTRATION_FOR_VOTE_KEY_QUERY}") + }) } } diff --git a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_invalid.rs b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_invalid.rs index a5a82e1d26e..172035bbc72 100644 --- a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_invalid.rs +++ b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_invalid.rs @@ -17,7 +17,7 @@ const INSERT_CIP36_REGISTRATION_INVALID_QUERY: &str = /// Insert CIP-36 Registration Invalid Query Parameters #[derive(SerializeRow, Clone)] -pub(super) struct Params { +pub(crate) struct Params { /// Full Stake Address (not hashed, 32 byte ED25519 Public key). stake_address: Vec, /// Slot Number the cert is in. @@ -98,10 +98,10 @@ impl Params { } /// Prepare Batch of Insert CIP-36 Registration Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_CIP36_REGISTRATION_INVALID_QUERY, cfg, @@ -109,12 +109,8 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = insert_queries { - error!(error=%error,"Failed to prepare Insert CIP-36 Registration Invalid Query."); - }; - - insert_queries + .await + .inspect_err(|error| error!(error=%error,"Failed to prepare Insert CIP-36 Registration Invalid Query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_CIP36_REGISTRATION_INVALID_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/cip36/mod.rs b/catalyst-gateway/bin/src/db/index/block/cip36/mod.rs index a03dca54e2d..0bdef2fd344 100644 --- a/catalyst-gateway/bin/src/db/index/block/cip36/mod.rs +++ b/catalyst-gateway/bin/src/db/index/block/cip36/mod.rs @@ -1,8 +1,8 @@ //! Index CIP-36 Registrations. -mod insert_cip36; -mod insert_cip36_for_vote_key; -mod insert_cip36_invalid; +pub(crate) mod insert_cip36; +pub(crate) mod insert_cip36_for_vote_key; +pub(crate) mod insert_cip36_invalid; use std::sync::Arc; diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_role0_key.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_role0_key.rs index ab6ec05b814..9ed1b0e30b5 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_role0_key.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_role0_key.rs @@ -15,7 +15,7 @@ const INSERT_CHAIN_ROOT_FOR_ROLE0_KEY_QUERY: &str = /// Insert Chain Root For Role 0 Key Query Parameters #[derive(SerializeRow)] -pub(super) struct Params { +pub(crate) struct Params { /// Role 0 Key Hash. 32 bytes. role0_key: Vec, /// Block Slot Number @@ -39,7 +39,7 @@ impl Debug for Params { impl Params { /// Create a new record for this transaction. - pub(super) fn new(role0_key: &[u8], chain_root: &[u8], slot_no: u64, txn: i16) -> Self { + pub(crate) fn new(role0_key: &[u8], chain_root: &[u8], slot_no: u64, txn: i16) -> Self { Params { role0_key: role0_key.to_vec(), slot_no: num_bigint::BigInt::from(slot_no), @@ -49,10 +49,10 @@ impl Params { } /// Prepare Batch of RBAC Registration Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &EnvVars, ) -> anyhow::Result { - let insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_CHAIN_ROOT_FOR_ROLE0_KEY_QUERY, cfg, @@ -60,12 +60,8 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = insert_queries { - error!(error=%error,"Failed to prepare Insert Chain Root For Role 0 Key Query."); - }; - - insert_queries + .await + .inspect_err(|error| error!(error=%error,"Failed to prepare Insert Chain Root For Role 0 Key Query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_CHAIN_ROOT_FOR_ROLE0_KEY_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_stake_address.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_stake_address.rs index 172650c51db..f46a46ef1fd 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_stake_address.rs @@ -15,7 +15,7 @@ const INSERT_CHAIN_ROOT_FOR_STAKE_ADDRESS_QUERY: &str = /// Insert Chain Root For Stake Address Query Parameters #[derive(SerializeRow)] -pub(super) struct Params { +pub(crate) struct Params { /// Stake Address Hash. 32 bytes. stake_addr: Vec, /// Block Slot Number @@ -39,7 +39,7 @@ impl Debug for Params { impl Params { /// Create a new record for this transaction. - pub(super) fn new(stake_addr: &[u8], chain_root: &[u8], slot_no: u64, txn: i16) -> Self { + pub(crate) fn new(stake_addr: &[u8], chain_root: &[u8], slot_no: u64, txn: i16) -> Self { Params { stake_addr: stake_addr.to_vec(), slot_no: num_bigint::BigInt::from(slot_no), @@ -49,10 +49,10 @@ impl Params { } /// Prepare Batch of RBAC Registration Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &EnvVars, ) -> anyhow::Result { - let insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_CHAIN_ROOT_FOR_STAKE_ADDRESS_QUERY, cfg, @@ -60,12 +60,8 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = insert_queries { - error!(error=%error,"Failed to prepare Insert Chain Root For Stake Address Query."); - }; - - insert_queries + .await + .inspect_err(|error| error!(error=%error,"Failed to prepare Insert Chain Root For Stake Address Query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_CHAIN_ROOT_FOR_STAKE_ADDRESS_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_txn_id.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_txn_id.rs index 300b81b580f..1af8540929d 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_txn_id.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_chain_root_for_txn_id.rs @@ -15,7 +15,7 @@ const INSERT_CHAIN_ROOT_FOR_TXN_ID_QUERY: &str = /// Insert Chain Root For Transaction ID Query Parameters #[derive(SerializeRow)] -pub(super) struct Params { +pub(crate) struct Params { /// Transaction ID Hash. 32 bytes. transaction_id: Vec, /// Chain Root Hash. 32 bytes. @@ -33,7 +33,7 @@ impl Debug for Params { impl Params { /// Create a new record for this transaction. - pub(super) fn new(chain_root: &[u8], transaction_id: &[u8]) -> Self { + pub(crate) fn new(chain_root: &[u8], transaction_id: &[u8]) -> Self { Params { transaction_id: transaction_id.to_vec(), chain_root: chain_root.to_vec(), @@ -41,10 +41,10 @@ impl Params { } /// Prepare Batch of RBAC Registration Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &EnvVars, ) -> anyhow::Result { - let insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_CHAIN_ROOT_FOR_TXN_ID_QUERY, cfg, @@ -52,12 +52,10 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = insert_queries { - error!(error=%error,"Failed to prepare Insert Chain Root For TXN ID Query."); - }; - - insert_queries + .await + .inspect_err( + |error| error!(error=%error,"Failed to prepare Insert Chain Root For TXN ID Query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_CHAIN_ROOT_FOR_TXN_ID_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs index 74fad121b4f..ff7287ac4d5 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs @@ -16,7 +16,7 @@ const INSERT_RBAC509_QUERY: &str = include_str!("./cql/insert_rbac509.cql"); /// Insert RBAC Registration Query Parameters #[derive(SerializeRow)] -pub(super) struct Params { +pub(crate) struct Params { /// Chain Root Hash. 32 bytes. chain_root: Vec, /// Transaction ID Hash. 32 bytes. @@ -50,7 +50,7 @@ impl Debug for Params { impl Params { /// Create a new record for this transaction. - pub(super) fn new( + pub(crate) fn new( chain_root: &[u8], transaction_id: &[u8], slot_no: u64, txn: i16, cip509: &Cip509, ) -> Self { Params { @@ -68,10 +68,10 @@ impl Params { } /// Prepare Batch of RBAC Registration Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &EnvVars, ) -> anyhow::Result { - let insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_RBAC509_QUERY, cfg, @@ -79,12 +79,10 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = insert_queries { - error!(error=%error,"Failed to prepare Insert RBAC 509 Registration Query."); - }; - - insert_queries + .await + .inspect_err( + |error| error!(error=%error,"Failed to prepare Insert RBAC 509 Registration Query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_RBAC509_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs index 99af25e9e6e..fe048146154 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs @@ -1,9 +1,9 @@ //! Index Role-Based Access Control (RBAC) Registration. -mod insert_chain_root_for_role0_key; -mod insert_chain_root_for_stake_address; -mod insert_chain_root_for_txn_id; -mod insert_rbac509; +pub(crate) mod insert_chain_root_for_role0_key; +pub(crate) mod insert_chain_root_for_stake_address; +pub(crate) mod insert_chain_root_for_txn_id; +pub(crate) mod insert_rbac509; use std::sync::{Arc, LazyLock}; diff --git a/catalyst-gateway/bin/src/db/index/block/roll_forward.rs b/catalyst-gateway/bin/src/db/index/block/roll_forward.rs index 46994b5d4c7..1c0b81bf30c 100644 --- a/catalyst-gateway/bin/src/db/index/block/roll_forward.rs +++ b/catalyst-gateway/bin/src/db/index/block/roll_forward.rs @@ -303,8 +303,8 @@ async fn purge_unstaked_txo_assets( // Filter let mut delete_params: Vec = Vec::new(); while let Some(Ok(primary_key)) = primary_keys_stream.next().await { - let params: Params = primary_key.into(); - if ¶ms.slot_no <= purge_to_slot { + if &primary_key.4 <= purge_to_slot { + let params: Params = primary_key.into(); delete_params.push(params); } } diff --git a/catalyst-gateway/bin/src/db/index/block/txi.rs b/catalyst-gateway/bin/src/db/index/block/txi.rs index f8ff02da8af..5e7d1adddc4 100644 --- a/catalyst-gateway/bin/src/db/index/block/txi.rs +++ b/catalyst-gateway/bin/src/db/index/block/txi.rs @@ -56,7 +56,7 @@ impl TxiInsertQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let txi_insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_TXI_QUERY, cfg, @@ -64,13 +64,9 @@ impl TxiInsertQuery { true, false, ) - .await; - - if let Err(ref error) = txi_insert_queries { - error!(error=%error,"Failed to prepare Insert TXI Query."); - }; - - txi_insert_queries + .await + .inspect_err(|error| error!(error=%error,"Failed to prepare Insert TXI Query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_TXI_QUERY}")) } /// Index the transaction Inputs. diff --git a/catalyst-gateway/bin/src/db/index/block/txo/insert_txo.rs b/catalyst-gateway/bin/src/db/index/block/txo/insert_txo.rs index d6c8b7702c9..46eb58db84e 100644 --- a/catalyst-gateway/bin/src/db/index/block/txo/insert_txo.rs +++ b/catalyst-gateway/bin/src/db/index/block/txo/insert_txo.rs @@ -18,7 +18,7 @@ const INSERT_TXO_QUERY: &str = include_str!("./cql/insert_txo.cql"); /// Insert TXO Query Parameters /// (Superset of data to support both Staked and Unstaked TXO records.) #[derive(SerializeRow, Debug)] -pub(super) struct Params { +pub(crate) struct Params { /// Stake Address - Binary 28 bytes. 0 bytes = not staked. stake_address: Vec, /// Block Slot Number @@ -37,7 +37,7 @@ pub(super) struct Params { impl Params { /// Create a new record for this transaction. - pub(super) fn new( + pub(crate) fn new( stake_address: &[u8], slot_no: u64, txn: i16, txo: i16, address: &str, value: u64, txn_hash: &[u8], ) -> Self { @@ -53,10 +53,10 @@ impl Params { } /// Prepare Batch of Staked Insert TXO Asset Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let txo_insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_TXO_QUERY, cfg, @@ -64,12 +64,8 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = txo_insert_queries { - error!(error=%error,"Failed to prepare Insert TXO Asset Query."); - }; - - txo_insert_queries + .await + .inspect_err(|error| error!(error=%error,"Failed to prepare Insert TXO Query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_TXO_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/txo/insert_txo_asset.rs b/catalyst-gateway/bin/src/db/index/block/txo/insert_txo_asset.rs index 69e547594a7..6c430b9f408 100644 --- a/catalyst-gateway/bin/src/db/index/block/txo/insert_txo_asset.rs +++ b/catalyst-gateway/bin/src/db/index/block/txo/insert_txo_asset.rs @@ -16,7 +16,7 @@ const INSERT_TXO_ASSET_QUERY: &str = include_str!("./cql/insert_txo_asset.cql"); /// Insert TXO Asset Query Parameters /// (Superset of data to support both Staked and Unstaked TXO records.) #[derive(SerializeRow, Debug)] -pub(super) struct Params { +pub(crate) struct Params { /// Stake Address - Binary 28 bytes. 0 bytes = not staked. stake_address: Vec, /// Block Slot Number @@ -39,7 +39,7 @@ impl Params { /// Note Value can be either a u64 or an i64, so use a i128 to represent all possible /// values. #[allow(clippy::too_many_arguments)] - pub(super) fn new( + pub(crate) fn new( stake_address: &[u8], slot_no: u64, txn: i16, txo: i16, policy_id: &[u8], asset_name: &[u8], value: i128, ) -> Self { @@ -55,10 +55,10 @@ impl Params { } /// Prepare Batch of Staked Insert TXO Asset Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let txo_insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_TXO_ASSET_QUERY, cfg, @@ -66,12 +66,8 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = txo_insert_queries { - error!(error=%error,"Failed to prepare Insert TXO Asset Query."); - }; - - txo_insert_queries + .await + .inspect_err(|error| error!(error=%error,"Failed to prepare Insert TXO Asset Query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_TXO_ASSET_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo.rs b/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo.rs index 7f68823af25..e308b883f30 100644 --- a/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo.rs +++ b/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo.rs @@ -15,7 +15,7 @@ const INSERT_UNSTAKED_TXO_QUERY: &str = include_str!("./cql/insert_unstaked_txo. /// Insert TXO Unstaked Query Parameters /// (Superset of data to support both Staked and Unstaked TXO records.) #[derive(SerializeRow, Debug)] -pub(super) struct Params { +pub(crate) struct Params { /// Transactions hash. txn_hash: Vec, /// Transaction Output Offset inside the transaction. @@ -32,7 +32,7 @@ pub(super) struct Params { impl Params { /// Create a new record for this transaction. - pub(super) fn new( + pub(crate) fn new( txn_hash: &[u8], txo: i16, slot_no: u64, txn: i16, address: &str, value: u64, ) -> Self { Self { @@ -46,10 +46,10 @@ impl Params { } /// Prepare Batch of Staked Insert TXO Asset Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let txo_insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_UNSTAKED_TXO_QUERY, cfg, @@ -57,12 +57,8 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = txo_insert_queries { - error!(error=%error,"Failed to prepare Insert TXO Asset Query."); - }; - - txo_insert_queries + .await + .inspect_err(|error| error!(error=%error,"Failed to prepare Unstaked Insert TXO Query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_UNSTAKED_TXO_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo_asset.rs b/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo_asset.rs index 7436d36d200..14588711ae2 100644 --- a/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo_asset.rs +++ b/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo_asset.rs @@ -16,7 +16,7 @@ const INSERT_UNSTAKED_TXO_ASSET_QUERY: &str = include_str!("./cql/insert_unstake /// Insert TXO Asset Query Parameters /// (Superset of data to support both Staked and Unstaked TXO records.) #[derive(SerializeRow, Debug)] -pub(super) struct Params { +pub(crate) struct Params { /// Transactions hash. txn_hash: Vec, /// Transaction Output Offset inside the transaction. @@ -39,7 +39,7 @@ impl Params { /// Note Value can be either a u64 or an i64, so use a i128 to represent all possible /// values. #[allow(clippy::too_many_arguments)] - pub(super) fn new( + pub(crate) fn new( txn_hash: &[u8], txo: i16, policy_id: &[u8], asset_name: &[u8], slot_no: u64, txn: i16, value: i128, ) -> Self { @@ -55,10 +55,10 @@ impl Params { } /// Prepare Batch of Staked Insert TXO Asset Index Data Queries - pub(super) async fn prepare_batch( + pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let txo_insert_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), INSERT_UNSTAKED_TXO_ASSET_QUERY, cfg, @@ -66,12 +66,10 @@ impl Params { true, false, ) - .await; - - if let Err(ref error) = txo_insert_queries { - error!(error=%error,"Failed to prepare Insert Unstaked TXO Asset Query."); - }; - - txo_insert_queries + .await + .inspect_err( + |error| error!(error=%error,"Failed to prepare Insert Unstaked TXO Asset Query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_UNSTAKED_TXO_ASSET_QUERY}")) } } diff --git a/catalyst-gateway/bin/src/db/index/block/txo/mod.rs b/catalyst-gateway/bin/src/db/index/block/txo/mod.rs index f3f08440e60..768b1ad3f7f 100644 --- a/catalyst-gateway/bin/src/db/index/block/txo/mod.rs +++ b/catalyst-gateway/bin/src/db/index/block/txo/mod.rs @@ -2,10 +2,10 @@ //! //! Note, there are multiple ways TXO Data is indexed and they all happen in here. -mod insert_txo; -mod insert_txo_asset; -mod insert_unstaked_txo; -mod insert_unstaked_txo_asset; +pub(crate) mod insert_txo; +pub(crate) mod insert_txo_asset; +pub(crate) mod insert_unstaked_txo; +pub(crate) mod insert_unstaked_txo_asset; use std::sync::Arc; diff --git a/catalyst-gateway/bin/src/db/index/queries/mod.rs b/catalyst-gateway/bin/src/db/index/queries/mod.rs index 0ae6db056c7..710c2a2e66c 100644 --- a/catalyst-gateway/bin/src/db/index/queries/mod.rs +++ b/catalyst-gateway/bin/src/db/index/queries/mod.rs @@ -401,9 +401,9 @@ async fn session_execute_batch( values: Vec, ) -> FallibleQueryResults { let mut results: Vec = Vec::new(); + let mut errors = Vec::new(); let chunks = values.chunks(cfg.max_batch_size.try_into().unwrap_or(1)); - let mut query_failed = false; let query_str = format!("{query}"); for chunk in chunks { @@ -418,14 +418,14 @@ async fn session_execute_batch( Err(err) => { let chunk_str = format!("{chunk:?}"); error!(error=%err, query=query_str, chunk=chunk_str, "Query Execution Failed"); - query_failed = true; + errors.push(err); // Defer failure until all batches have been processed. }, } } - if query_failed { - bail!("Query Failed: {query_str}!"); + if !errors.is_empty() { + bail!("Query Failed: {query_str}! {errors:?}"); } Ok(results) diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_role0_key.rs b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_role0_key.rs index b13a4103e69..fcaa7500b0c 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_role0_key.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_role0_key.rs @@ -64,19 +64,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all Chain Root For Role0 Key registration primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get Chain Root For Role0 Key registration primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get Chain Root For Role0 Key registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all Chain Root For Role0 Key registration primary keys. @@ -103,7 +101,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -111,8 +109,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete Chain Root For Role0 Key registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_stake_address.rs index ac925a2c350..0cc9a64c673 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_stake_address.rs @@ -65,19 +65,17 @@ impl PrimaryKeyQuery { /// Prepares a query to get all Chain Root For Stake Address registration primary /// keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get Chain Root For Stake Address registration primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get Chain Root For Stake Address registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all Chain Root For Stake Address registration primary @@ -105,7 +103,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -113,8 +111,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete Chain Root For Stake Address registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_txn_id.rs b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_txn_id.rs index aeb258daf56..aeaec58f72f 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_txn_id.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_txn_id.rs @@ -61,19 +61,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all Chain Root For TX ID registration primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get Chain Root For TX ID registration primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get Chain Root For TX ID registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all Chain Root For TX ID registration primary keys. @@ -100,7 +98,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -108,8 +106,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete Chain Root For TX ID registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs index 3ba4b2b4d83..96bf0af4e6e 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs @@ -68,19 +68,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all CIP-36 registration primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get CIP-36 registration primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get CIP-36 registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all CIP-36 registration primary keys. @@ -107,7 +105,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -115,8 +113,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete CIP-36 registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs index 7f4dbd02dd7..7381524cc5b 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs @@ -72,19 +72,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all CIP-36 registration by Vote key primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get CIP-36 registration by Vote key primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get CIP-36 registration by Vote key primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all CIP-36 registration by Vote key primary keys. @@ -111,7 +109,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -119,8 +117,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete CIP-36 registration by Vote key primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs index 13e7edd70ad..36924bd61a5 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs @@ -64,19 +64,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all CIP-36 invalid registration primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get CIP-36 invalid registration primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get CIP-36 invalid registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all CIP-36 invalid registration primary keys. @@ -103,7 +101,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -111,8 +109,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete CIP-36 invalid registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txi_by_txn_hashes.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txi_by_txn_hashes.cql index 062f3388847..afacd29fcd2 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txi_by_txn_hashes.cql +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txi_by_txn_hashes.cql @@ -1,5 +1,6 @@ -- Get all primary keys from ADA or a native asset being spent. SELECT txn_hash, - txo + txo, + slot_no FROM txi_by_txn_hash diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_assets_by_txn_hash.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_assets_by_txn_hash.cql index 8f56fd434ef..6b2b6c1ebb3 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_assets_by_txn_hash.cql +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_assets_by_txn_hash.cql @@ -3,5 +3,6 @@ SELECT txn_hash, txo, policy_id, - asset_name + asset_name, + slot_no FROM unstaked_txo_assets_by_txn_hash diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_by_txn_hash.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_by_txn_hash.cql index dffba316e50..230f66a5a98 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_by_txn_hash.cql +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_by_txn_hash.cql @@ -1,5 +1,6 @@ -- Get ALL Primary Keys from Unstaked Transaction Outputs (Native Assets) per stake address. SELECT txn_hash, - txo + txo, + slot_no FROM unstaked_txo_by_txn_hash diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs b/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs index 4ca10bb3a55..513e8003427 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs @@ -72,19 +72,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all RBAC 509 registration primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get RBAC 509 registration primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get RBAC 509 registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all RBAC 509 registration primary keys. @@ -111,7 +109,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -119,8 +117,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete RBAC 509 registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs b/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs index ad5e2270c6f..244c9fa8fd0 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs @@ -22,7 +22,7 @@ pub(crate) mod result { //! Return values for Stake Registration purge queries. /// Primary Key Row - pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, i16); + pub(crate) type PrimaryKey = (Vec, bool, num_bigint::BigInt, i16); } /// Select primary keys for Stake Registration. @@ -33,6 +33,8 @@ const SELECT_QUERY: &str = include_str!("./cql/get_stake_registration.cql"); pub(crate) struct Params { /// Stake Address - Binary 28 bytes. 0 bytes = not staked. pub(crate) stake_hash: Vec, + /// Is the address a script address. + pub(crate) script: bool, /// Block Slot Number pub(crate) slot_no: num_bigint::BigInt, /// Transaction Offset inside the block. @@ -43,6 +45,7 @@ impl Debug for Params { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Params") .field("stake_hash", &self.stake_hash) + .field("script", &self.script) .field("slot_no", &self.slot_no) .field("txn", &self.txn) .finish() @@ -53,8 +56,9 @@ impl From for Params { fn from(value: result::PrimaryKey) -> Self { Self { stake_hash: value.0, - slot_no: value.1, - txn: value.2, + script: value.1, + slot_no: value.2, + txn: value.3, } } } @@ -64,19 +68,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all Stake Registration primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get Stake Registration primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get Stake Registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all Stake Registration primary keys. @@ -103,7 +105,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -111,8 +113,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete Stake Registration primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs index 002f8be8a5c..ce6ebcbf025 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs @@ -60,19 +60,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all TXI by hash primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get TXI by hash primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get TXI by hash primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all TXI by hash primary keys. @@ -99,7 +97,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -107,8 +105,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete TXI by hash primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs index e2ae31d8637..227b44ca4a6 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs @@ -68,19 +68,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all TXO by stake address primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get TXO by stake address primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get TXO by stake address primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all TXO by stake address primary keys. @@ -107,7 +105,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -115,8 +113,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete TXO by stake address primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs index 760149fcb78..f2e046abd9a 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs @@ -76,19 +76,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all TXO Assets by stake address primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get TXO Assets by stake address primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get TXO Assets by stake address primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all TXO Assets by stake address primary keys. @@ -115,7 +113,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -123,8 +121,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete TXO Assets by stake address primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs index 99c4d5da22f..719ee8ccef9 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs @@ -61,19 +61,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all Unstaked TXO ADA primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get Unstaked TXO ADA primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get Unstaked TXO ADA primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all Unstaked TXO ADA primary keys. @@ -100,7 +98,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -108,8 +106,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete Unstaked TXO ADA primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs index 011240e6516..fc4acdddbe3 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs @@ -22,7 +22,7 @@ pub(crate) mod result { //! Return values for TXO Assets by TXN Hash purge queries. /// Primary Key Row - pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, i16, i16, Vec, Vec); + pub(crate) type PrimaryKey = (Vec, i16, Vec, Vec, num_bigint::BigInt); } /// Select primary keys for TXO Assets by TXN Hash. @@ -31,26 +31,20 @@ const SELECT_QUERY: &str = include_str!("./cql/get_unstaked_txo_assets_by_txn_ha /// Primary Key Value. #[derive(SerializeRow)] pub(crate) struct Params { - /// Stake Address - Binary 28 bytes. 0 bytes = not staked. - pub(crate) stake_address: Vec, - /// Block Slot Number - pub(crate) slot_no: num_bigint::BigInt, - /// Transaction Offset inside the block. - pub(crate) txn: i16, - /// Transaction Output Offset inside the transaction. + /// 32 byte hash of this transaction. + pub(crate) txn_hash: Vec, + /// Offset in the txo list of the transaction the txo is in. pub(crate) txo: i16, /// Asset Policy Hash - Binary 28 bytes. - policy_id: Vec, + pub(crate) policy_id: Vec, /// Name of the asset, within the Policy. - asset_name: Vec, + pub(crate) asset_name: Vec, } impl Debug for Params { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Params") - .field("stake_address", &self.stake_address) - .field("slot_no", &self.slot_no) - .field("txn", &self.txn) + .field("txn_hash", &self.txn_hash) .field("txo", &self.txo) .field("policy_id", &self.policy_id) .field("asset_name", &self.asset_name) @@ -61,12 +55,10 @@ impl Debug for Params { impl From for Params { fn from(value: result::PrimaryKey) -> Self { Self { - stake_address: value.0, - slot_no: value.1, - txn: value.2, - txo: value.3, - policy_id: value.4, - asset_name: value.5, + txn_hash: value.0, + txo: value.1, + policy_id: value.2, + asset_name: value.3, } } } @@ -76,19 +68,17 @@ pub(crate) struct PrimaryKeyQuery; impl PrimaryKeyQuery { /// Prepares a query to get all TXO Assets by TXN Hash primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - let select_primary_key = PreparedQueries::prepare( + PreparedQueries::prepare( session.clone(), SELECT_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = select_primary_key { - error!(error=%error, "Failed to prepare get TXO Assets by TXN Hash primary key query"); - }; - - select_primary_key + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get TXO Assets by TXN Hash primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) } /// Executes a query to get all TXO Assets by TXN Hash primary keys. @@ -115,7 +105,7 @@ impl DeleteQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let delete_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), DELETE_QUERY, cfg, @@ -123,8 +113,11 @@ impl DeleteQuery { true, false, ) - .await?; - Ok(delete_queries) + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare delete TXO Assets by TXN Hash primary key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) } /// Executes a DELETE Query diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_stakes_and_vote_keys.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_stakes_and_vote_keys.rs index 2143cdedd48..8c3cb850483 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_stakes_and_vote_keys.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_stakes_and_vote_keys.rs @@ -34,17 +34,17 @@ pub(crate) struct GetAllStakesAndVoteKeysQuery { impl GetAllStakesAndVoteKeysQuery { /// Prepares get all `stake_addr` paired with vote keys [(`stake_addr,vote_key`)] pub(crate) async fn prepare(session: Arc) -> anyhow::Result { - let get_all_stake_and_vote_keys = PreparedQueries::prepare( + PreparedQueries::prepare( session, GET_ALL_STAKES_AND_VOTE_KEYS, scylla::statement::Consistency::All, true, ) - .await; - - get_all_stake_and_vote_keys.inspect_err( + .await + .inspect_err( |error| error!(error=%error, "Failed to prepare get all (stake addrs, vote_keys)"), ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{GET_ALL_STAKES_AND_VOTE_KEYS}")) } /// Executes get all `stake_addr` paired with vote keys [(`stake_addr,vote_key`)] diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs index 5c6e0d7362f..25aa5c58067 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs @@ -56,19 +56,15 @@ pub(crate) struct GetRegistrationQuery { impl GetRegistrationQuery { /// Prepares a get registration query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { - let get_registrations_query = PreparedQueries::prepare( + PreparedQueries::prepare( session, GET_REGISTRATIONS_FROM_STAKE_ADDR_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = get_registrations_query { - error!(error=%error, "Failed to prepare get registration query."); - }; - - get_registrations_query + .await + .inspect_err(|error| error!(error=%error, "Failed to prepare get registration from stake address query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{GET_REGISTRATIONS_FROM_STAKE_ADDR_QUERY}")) } /// Executes get registration info for given stake addr query. diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_hash.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_hash.rs index 134e199e007..a6187e8489a 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_hash.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_hash.rs @@ -41,19 +41,17 @@ pub(crate) struct GetStakeAddrQuery { impl GetStakeAddrQuery { /// Prepares a get get stake addr from stake hash query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { - let get_stake_addr_query = PreparedQueries::prepare( + PreparedQueries::prepare( session, GET_STAKE_ADDR_FROM_STAKE_HASH, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = get_stake_addr_query { - error!(error=%error, "Failed to prepare get stake addr query."); - }; - - get_stake_addr_query + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get stake addr from stake hash query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{GET_STAKE_ADDR_FROM_STAKE_HASH}")) } /// Executes a get txi by transaction hashes query. diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs index 42bdcb9fa75..77b54f95119 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs @@ -39,19 +39,17 @@ pub(crate) struct GetStakeAddrFromVoteKeyQuery { impl GetStakeAddrFromVoteKeyQuery { /// Prepares a get stake addr from vote key query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { - let get_stake_addr_query = PreparedQueries::prepare( + PreparedQueries::prepare( session, GET_STAKE_ADDR_FROM_VOTE_KEY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = get_stake_addr_query { - error!(error=%error, "Failed to prepare get stake addr from vote key query."); - }; - - get_stake_addr_query + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get stake addr from vote key query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{GET_STAKE_ADDR_FROM_VOTE_KEY}")) } /// Executes a get txi by transaction hashes query. diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs index 130f6bcb2d8..42b7fce86ea 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs @@ -59,19 +59,17 @@ pub(crate) struct GetInvalidRegistrationQuery { impl GetInvalidRegistrationQuery { /// Prepares a get invalid registration query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { - let get_invalid_registration_query = PreparedQueries::prepare( + PreparedQueries::prepare( session, GET_INVALID_REGISTRATIONS_FROM_STAKE_ADDR_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = get_invalid_registration_query { - error!(error=%error, "Failed to prepare get registration query."); - }; - - get_invalid_registration_query + .await + .inspect_err(|error| error!(error=%error, "Failed to prepare get invalid registration from stake address query.")) + .map_err(|error| { + anyhow::anyhow!("{error}\n--\n{GET_INVALID_REGISTRATIONS_FROM_STAKE_ADDR_QUERY}") + }) } /// Executes get invalid registration info for given stake addr query. diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs index 343a7a36c82..4cd1f019630 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs @@ -55,19 +55,15 @@ pub(crate) struct GetAssetsByStakeAddressQuery { impl GetAssetsByStakeAddressQuery { /// Prepares a get assets by stake address query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { - let get_assets_by_stake_address_query = PreparedQueries::prepare( + PreparedQueries::prepare( session, GET_ASSETS_BY_STAKE_ADDRESS_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = get_assets_by_stake_address_query { - error!(error=%error, "Failed to prepare get assets by stake address"); - }; - - get_assets_by_stake_address_query + .await + .inspect_err(|error| error!(error=%error, "Failed to prepare get assets by stake address.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{GET_ASSETS_BY_STAKE_ADDRESS_QUERY}")) } /// Executes a get assets by stake address query. diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs index 0bcd368d9c5..86a55c1f530 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs @@ -44,19 +44,15 @@ pub(crate) struct GetTxiByTxnHashesQuery { impl GetTxiByTxnHashesQuery { /// Prepares a get txi query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { - let get_txi_by_txn_hashes_query = PreparedQueries::prepare( + PreparedQueries::prepare( session, GET_TXI_BY_TXN_HASHES_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = get_txi_by_txn_hashes_query { - error!(error=%error, "Failed to prepare get TXI by txn hashes query."); - }; - - get_txi_by_txn_hashes_query + .await + .inspect_err(|error| error!(error=%error, "Failed to prepare get TXI by txn hashes query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{GET_TXI_BY_TXN_HASHES_QUERY}")) } /// Executes a get txi by transaction hashes query. diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs index 9e06f64b542..2c5d0e460cd 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs @@ -54,19 +54,15 @@ pub(crate) struct GetTxoByStakeAddressQuery { impl GetTxoByStakeAddressQuery { /// Prepares a get txo by stake address query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { - let get_txo_by_stake_address_query = PreparedQueries::prepare( + PreparedQueries::prepare( session, GET_TXO_BY_STAKE_ADDRESS_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = get_txo_by_stake_address_query { - error!(error=%error, "Failed to prepare get TXO by stake address"); - }; - - get_txo_by_stake_address_query + .await + .inspect_err(|error| error!(error=%error, "Failed to prepare get TXO by stake address.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{GET_TXO_BY_STAKE_ADDRESS_QUERY}")) } /// Executes a get txo by stake address query. diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/update_txo_spent.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/update_txo_spent.rs index 3c998690303..2d92163b81f 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/update_txo_spent.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/update_txo_spent.rs @@ -39,7 +39,7 @@ impl UpdateTxoSpentQuery { pub(crate) async fn prepare_batch( session: Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { - let update_txo_spent_queries = PreparedQueries::prepare_batch( + PreparedQueries::prepare_batch( session.clone(), UPDATE_TXO_SPENT_QUERY, cfg, @@ -47,13 +47,9 @@ impl UpdateTxoSpentQuery { true, false, ) - .await; - - if let Err(ref error) = update_txo_spent_queries { - error!(error=%error,"Failed to prepare update TXO spent query."); - }; - - update_txo_spent_queries + .await + .inspect_err(|error| error!(error=%error, "Failed to prepare update TXO spent query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{UPDATE_TXO_SPENT_QUERY}")) } /// Executes a update txo spent query. diff --git a/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs b/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs index 938962f9e2e..9a073368cb9 100644 --- a/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs +++ b/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs @@ -60,19 +60,17 @@ pub(crate) struct SyncStatusInsertQuery; impl SyncStatusInsertQuery { /// Prepares a Sync Status Insert query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { - let sync_status_insert_query = PreparedQueries::prepare( + PreparedQueries::prepare( session, INSERT_SYNC_STATUS_QUERY, scylla::statement::Consistency::All, true, ) - .await; - - if let Err(ref error) = sync_status_insert_query { - error!(error=%error, "Failed to prepare get Sync Status Insert query."); - }; - - sync_status_insert_query + .await + .inspect_err( + |error| error!(error=%error, "Failed to prepare get Sync Status Insert query."), + ) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_SYNC_STATUS_QUERY}")) } /// Executes a sync status insert query. diff --git a/catalyst-gateway/bin/src/db/index/schema/mod.rs b/catalyst-gateway/bin/src/db/index/schema/mod.rs index ef940f955be..4b26234c01c 100644 --- a/catalyst-gateway/bin/src/db/index/schema/mod.rs +++ b/catalyst-gateway/bin/src/db/index/schema/mod.rs @@ -222,24 +222,35 @@ pub(crate) async fn create_schema( .await .context("Creating Namespace")?; - let mut failed = false; + let mut errors = Vec::with_capacity(SCHEMAS.len()); for (schema, schema_name) in SCHEMAS { match session.prepare(*schema).await { Ok(stmt) => { if let Err(err) = session.execute_unpaged(&stmt, ()).await { - failed = true; error!(schema=schema_name, error=%err, "Failed to Execute Create Schema Query"); + errors.push(anyhow::anyhow!( + "Failed to Execute Create Schema Query: {err}\n--\nSchema: {schema_name}\n--\n{schema}" + )); }; }, Err(err) => { - failed = true; error!(schema=schema_name, error=%err, "Failed to Prepare Create Schema Query"); + errors.push(anyhow::anyhow!( + "Failed to Prepare Create Schema Query: {err}\n--\nSchema: {schema_name}\n--\n{schema}" + )); }, } } - anyhow::ensure!(!failed, "Failed to Create Schema"); + if !errors.is_empty() { + let fmt_err: Vec<_> = errors.into_iter().map(|err| format!("{err}")).collect(); + return Err(anyhow::anyhow!(format!( + "{} Error(s): {}", + fmt_err.len(), + fmt_err.join("\n") + ))); + } // Wait for the Schema to be ready. session.await_schema_agreement().await?; diff --git a/catalyst-gateway/bin/src/db/index/session.rs b/catalyst-gateway/bin/src/db/index/session.rs index a5531664984..57f952ca600 100644 --- a/catalyst-gateway/bin/src/db/index/session.rs +++ b/catalyst-gateway/bin/src/db/index/session.rs @@ -12,6 +12,7 @@ use scylla::{ frame::Compression, serialize::row::SerializeRow, transport::iterator::QueryPager, ExecutionProfile, Session, SessionBuilder, }; +use thiserror::Error; use tokio::fs; use tracing::{error, info}; @@ -49,6 +50,38 @@ pub(crate) enum TlsChoice { Unverified, } +/// Represents errors that can occur while interacting with a Cassandra session. +#[derive(Debug, Error)] +pub(crate) enum CassandraSessionError { + /// Error when creating a session fails. + #[error("Creating session failed: {source}")] + CreatingSessionFailed { + /// The underlying error that caused the session creation to fail. + source: anyhow::Error, + }, + /// Error when schema migration fails. + #[error("Schema migration failed: {source}")] + SchemaMigrationFailed { + /// The underlying error that caused the schema migration to fail. + source: anyhow::Error, + }, + /// Error when preparing queries fails. + #[error("Preparing queries failed: {source}")] + PreparingQueriesFailed { + /// The underlying error that caused query preparation to fail. + source: anyhow::Error, + }, + /// Error when preparing purge queries fails. + #[error("Preparing purge queries failed: {source}")] + PreparingPurgeQueriesFailed { + /// The underlying error that caused purge query preparation to fail. + source: anyhow::Error, + }, + /// Error indicating that the session has already been set. + #[error("Session already set")] + SessionAlreadySet, +} + /// All interaction with cassandra goes through this struct. #[derive(Clone)] pub(crate) struct CassandraSession { @@ -65,6 +98,9 @@ pub(crate) struct CassandraSession { purge_queries: Arc, } +/// Session error while initialization. +static INIT_SESSION_ERROR: OnceLock> = OnceLock::new(); + /// Persistent DB Session. static PERSISTENT_SESSION: OnceLock> = OnceLock::new(); @@ -88,10 +124,18 @@ impl CassandraSession { } /// Wait for the Cassandra Indexing DB to be ready before continuing - pub(crate) async fn wait_is_ready(interval: Duration) { + pub(crate) async fn wait_until_ready( + interval: Duration, ignore_err: bool, + ) -> Result<(), Arc> { loop { + if !ignore_err { + if let Some(err) = INIT_SESSION_ERROR.get() { + return Err(err.clone()); + } + } + if Self::is_ready() { - break; + return Ok(()); } tokio::time::sleep(interval).await; @@ -292,24 +336,30 @@ async fn retry_init(cfg: cassandra_db::EnvVars, persistent: bool) { let session = match make_session(&cfg).await { Ok(session) => session, Err(error) => { - let error = format!("{error:?}"); error!( db_type = db_type, - error = error, + error = format!("{error:?}"), "Failed to Create Cassandra DB Session" ); + drop(INIT_SESSION_ERROR.set(Arc::new( + CassandraSessionError::CreatingSessionFailed { source: error }, + ))); continue; }, }; // Set up the Schema for it. if let Err(error) = create_schema(&mut session.clone(), &cfg).await { - let error = format!("{error:?}"); error!( db_type = db_type, - error = error, + error = format!("{error:?}"), "Failed to Create Cassandra DB Schema" ); + drop( + INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SchemaMigrationFailed { + source: error, + })), + ); continue; } @@ -321,6 +371,9 @@ async fn retry_init(cfg: cassandra_db::EnvVars, persistent: bool) { error = %error, "Failed to Create Cassandra Prepared Queries" ); + drop(INIT_SESSION_ERROR.set(Arc::new( + CassandraSessionError::PreparingQueriesFailed { source: error }, + ))); continue; }, }; @@ -334,6 +387,9 @@ async fn retry_init(cfg: cassandra_db::EnvVars, persistent: bool) { error = %error, "Failed to Create Cassandra Prepared Purge Queries" ); + drop(INIT_SESSION_ERROR.set(Arc::new( + CassandraSessionError::PreparingPurgeQueriesFailed { source: error }, + ))); continue; }, }; @@ -350,9 +406,11 @@ async fn retry_init(cfg: cassandra_db::EnvVars, persistent: bool) { if persistent { if PERSISTENT_SESSION.set(Arc::new(cassandra_session)).is_err() { error!("Persistent Session already set. This should not happen."); + drop(INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SessionAlreadySet))); }; } else if VOLATILE_SESSION.set(Arc::new(cassandra_session)).is_err() { error!("Volatile Session already set. This should not happen."); + drop(INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SessionAlreadySet))); }; // IF we get here, then everything seems to have worked, so finish init. diff --git a/catalyst-gateway/bin/src/db/index/tests/mod.rs b/catalyst-gateway/bin/src/db/index/tests/mod.rs index 4775780bc2c..0e01a4dad90 100644 --- a/catalyst-gateway/bin/src/db/index/tests/mod.rs +++ b/catalyst-gateway/bin/src/db/index/tests/mod.rs @@ -7,15 +7,22 @@ use tokio::sync::OnceCell; use super::session::CassandraSession; +mod scylla_purge; mod scylla_queries; mod scylla_session; static SHARED_SESSION: OnceCell> = OnceCell::const_new(); +/// Use this message to prevent a long message from getting a session. +/// There is already a function that handling the error with its full form. +const SESSION_ERR_MSG: &str = "Failed to initialize or get a database session."; + async fn setup_test_database() -> Result<(), String> { CassandraSession::init(); - CassandraSession::wait_is_ready(core::time::Duration::from_secs(1)).await; + CassandraSession::wait_until_ready(core::time::Duration::from_secs(1), false) + .await + .map_err(|err| format!("{err}"))?; if !CassandraSession::is_ready() { return Err(String::from("Cassandra session is not ready")); @@ -38,5 +45,9 @@ fn get_session() -> Result<(Arc, Arc), Strin async fn get_shared_session() -> Result<(Arc, Arc), String> { SHARED_SESSION.get_or_init(setup_test_database).await; + if let Some(Err(err)) = SHARED_SESSION.get() { + return Err(err.clone()); + } + get_session() } diff --git a/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs b/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs new file mode 100644 index 00000000000..43e4e0e222c --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs @@ -0,0 +1,800 @@ +//! Integration tests of the `IndexDB` queries testing on its session. +//! This is mainly to test whether the defined purge queries work with the database or +//! not. + +use futures::StreamExt; + +use super::*; +use crate::db::index::{ + block::*, + queries::{purge::*, PreparedQuery}, +}; + +mod helper { + use cardano_chain_follower::Metadata::cip36::{Cip36, VotingPubKey}; + use ed25519_dalek::VerifyingKey; + + pub(super) fn create_dummy_cip36(number: u32) -> (Cip36, VotingPubKey) { + let empty_cip36 = Cip36 { + cip36: None, + voting_keys: vec![], + stake_pk: Some(VerifyingKey::from_bytes(&[u8::try_from(number).unwrap(); 32]).unwrap()), + payment_addr: vec![], + payable: false, + raw_nonce: 0, + nonce: 0, + purpose: 0, + signed: false, + strict_catalyst: true, + }; + + let pub_key = VotingPubKey { + voting_pk: VerifyingKey::from_bytes(&[u8::try_from(number).unwrap(); 32]).unwrap(), + weight: 0, + }; + + (empty_cip36, pub_key) + } +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_chain_root_for_role0_key() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let data = vec![ + rbac509::insert_chain_root_for_role0_key::Params::new(&[0], &[0], 0, 0), + rbac509::insert_chain_root_for_role0_key::Params::new(&[1], &[1], 1, 1), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::ChainRootForRole0KeyInsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = chain_root_for_role0_key::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(chain_root_for_role0_key::Params::from) + .collect(); + let row_results = chain_root_for_role0_key::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = chain_root_for_role0_key::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_chain_root_for_stake_address() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let data = vec![ + rbac509::insert_chain_root_for_stake_address::Params::new(&[0], &[0], 0, 0), + rbac509::insert_chain_root_for_stake_address::Params::new(&[1], &[1], 1, 1), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::ChainRootForStakeAddressInsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = chain_root_for_stake_address::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(chain_root_for_stake_address::Params::from) + .collect(); + let row_results = chain_root_for_stake_address::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = chain_root_for_stake_address::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_chain_root_for_txn_id() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let data = vec![ + rbac509::insert_chain_root_for_txn_id::Params::new(&[0], &[0]), + rbac509::insert_chain_root_for_txn_id::Params::new(&[1], &[1]), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::ChainRootForTxnIdInsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = chain_root_for_txn_id::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(chain_root_for_txn_id::Params::from) + .collect(); + let row_results = chain_root_for_txn_id::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = chain_root_for_txn_id::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_cip36_registration_for_vote_key() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let dummy0 = helper::create_dummy_cip36(0); + let dummy1 = helper::create_dummy_cip36(1); + + let data = vec![ + cip36::insert_cip36_for_vote_key::Params::new(&dummy0.1, 0, 0, &dummy0.0, false), + cip36::insert_cip36_for_vote_key::Params::new(&dummy1.1, 1, 1, &dummy1.0, true), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch( + PreparedQuery::Cip36RegistrationForStakeAddrInsertQuery, + data, + ) + .await + .unwrap(); + + // read + let mut row_stream = cip36_registration_for_vote_key::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(cip36_registration_for_vote_key::Params::from) + .collect(); + let row_results = + cip36_registration_for_vote_key::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = cip36_registration_for_vote_key::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_cip36_registration_invalid() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let dummy0 = helper::create_dummy_cip36(0); + let dummy1 = helper::create_dummy_cip36(1); + + let data = vec![ + cip36::insert_cip36_invalid::Params::new(Some(&dummy0.1), 0, 0, &dummy0.0, vec![]), + cip36::insert_cip36_invalid::Params::new(Some(&dummy1.1), 1, 1, &dummy1.0, vec![]), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::Cip36RegistrationInsertErrorQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = cip36_registration_invalid::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(cip36_registration_invalid::Params::from) + .collect(); + let row_results = cip36_registration_invalid::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = cip36_registration_invalid::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_cip36_registration() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let dummy0 = helper::create_dummy_cip36(0); + let dummy1 = helper::create_dummy_cip36(1); + + let data = vec![ + cip36::insert_cip36::Params::new(&dummy0.1, 0, 0, &dummy0.0), + cip36::insert_cip36::Params::new(&dummy1.1, 1, 1, &dummy1.0), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::Cip36RegistrationInsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = cip36_registration::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(cip36_registration::Params::from) + .collect(); + let row_results = cip36_registration::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = cip36_registration::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_rbac509_registration() { + use rbac_registration::cardano::cip509::Cip509; + + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let data = vec![ + rbac509::insert_rbac509::Params::new(&[0], &[0], 0, 0, &Cip509::default()), + rbac509::insert_rbac509::Params::new(&[1], &[1], 1, 1, &Cip509::default()), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::Rbac509InsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = rbac509_registration::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(rbac509_registration::Params::from) + .collect(); + let row_results = rbac509_registration::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = rbac509_registration::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_stake_registration() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let data = vec![ + certs::StakeRegistrationInsertQuery::new(vec![0], 0, 0, vec![0], false, false, false, None), + certs::StakeRegistrationInsertQuery::new(vec![1], 1, 1, vec![1], true, true, true, None), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::StakeRegistrationInsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = stake_registration::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(stake_registration::Params::from) + .collect(); + let row_results = stake_registration::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = stake_registration::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_txi_by_hash() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let data = vec![ + txi::TxiInsertParams::new(&[0], 0, 0), + txi::TxiInsertParams::new(&[1], 1, 1), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::TxiInsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = txi_by_hash::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(txi_by_hash::Params::from) + .collect(); + let row_results = txi_by_hash::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = txi_by_hash::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_txo_ada() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let data = vec![ + txo::insert_txo::Params::new(&[0], 0, 0, 0, "addr0", 0, &[0]), + txo::insert_txo::Params::new(&[1], 1, 1, 1, "addr1", 1, &[1]), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::TxoAdaInsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = txo_ada::PrimaryKeyQuery::execute(&session).await.unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows.into_iter().map(txo_ada::Params::from).collect(); + let row_results = txo_ada::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = txo_ada::PrimaryKeyQuery::execute(&session).await.unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_txo_assets() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let data = vec![ + txo::insert_txo_asset::Params::new(&[0], 0, 0, 0, &[0], &[0], 0), + txo::insert_txo_asset::Params::new(&[1], 1, 1, 1, &[1], &[1], 1), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::TxoAssetInsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = txo_assets::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(txo_assets::Params::from) + .collect(); + let row_results = txo_assets::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = txo_assets::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_unstaked_txo_ada() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let data = vec![ + txo::insert_unstaked_txo::Params::new(&[0], 0, 0, 0, "addr0", 0), + txo::insert_unstaked_txo::Params::new(&[1], 1, 1, 1, "addr1", 1), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::UnstakedTxoAdaInsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = unstaked_txo_ada::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(unstaked_txo_ada::Params::from) + .collect(); + let row_results = unstaked_txo_ada::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = unstaked_txo_ada::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} + +#[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] +#[tokio::test] +async fn test_unstaked_txo_assets() { + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; + + // data + let data = vec![ + txo::insert_unstaked_txo_asset::Params::new(&[0], 0, &[0], &[0], 0, 0, 0), + txo::insert_unstaked_txo_asset::Params::new(&[1], 1, &[1], &[1], 1, 1, 1), + ]; + let data_len = data.len(); + + // insert + session + .execute_batch(PreparedQuery::UnstakedTxoAssetInsertQuery, data) + .await + .unwrap(); + + // read + let mut row_stream = unstaked_txo_assets::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert_eq!(read_rows.len(), data_len); + + // delete + let delete_params = read_rows + .into_iter() + .map(unstaked_txo_assets::Params::from) + .collect(); + let row_results = unstaked_txo_assets::DeleteQuery::execute(&session, delete_params) + .await + .unwrap() + .into_iter() + .all(|r| r.result_not_rows().is_ok()); + + assert!(row_results); + + // re-read + let mut row_stream = unstaked_txo_assets::PrimaryKeyQuery::execute(&session) + .await + .unwrap(); + + let mut read_rows = vec![]; + while let Some(row_res) = row_stream.next().await { + read_rows.push(row_res.unwrap()); + } + + assert!(read_rows.is_empty()); +} diff --git a/catalyst-gateway/bin/src/db/index/tests/scylla_queries.rs b/catalyst-gateway/bin/src/db/index/tests/scylla_queries.rs index c233b9d02df..da798ce7e16 100644 --- a/catalyst-gateway/bin/src/db/index/tests/scylla_queries.rs +++ b/catalyst-gateway/bin/src/db/index/tests/scylla_queries.rs @@ -22,7 +22,9 @@ use crate::{ #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_get_assets_by_stake_addr() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; let mut row_stream = GetAssetsByStakeAddressQuery::execute( &session, @@ -32,15 +34,16 @@ async fn test_get_assets_by_stake_addr() { .unwrap(); while let Some(row_res) = row_stream.next().await { - let row = row_res.unwrap(); - drop(row); + drop(row_res.unwrap()); } } #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_get_chain_root() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; let mut row_stream = GetChainRootQuery::execute(&session, GetChainRootQueryParams { stake_address: vec![], @@ -49,15 +52,16 @@ async fn test_get_chain_root() { .unwrap(); while let Some(row_res) = row_stream.next().await { - let row = row_res.unwrap(); - drop(row); + drop(row_res.unwrap()); } } #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_get_invalid_registration_w_stake_addr() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; let mut row_stream = GetInvalidRegistrationQuery::execute( &session, @@ -67,15 +71,16 @@ async fn test_get_invalid_registration_w_stake_addr() { .unwrap(); while let Some(row_res) = row_stream.next().await { - let row = row_res.unwrap(); - drop(row); + drop(row_res.unwrap()); } } #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_get_registrations_by_chain_root() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; let mut row_stream = GetRegistrationsByChainRootQuery::execute( &session, @@ -85,15 +90,16 @@ async fn test_get_registrations_by_chain_root() { .unwrap(); while let Some(row_res) = row_stream.next().await { - let row = row_res.unwrap(); - drop(row); + drop(row_res.unwrap()); } } #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_get_registrations_w_stake_addr() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; let mut row_stream = GetRegistrationQuery::execute(&session, GetRegistrationParams { stake_address: vec![], @@ -102,15 +108,16 @@ async fn test_get_registrations_w_stake_addr() { .unwrap(); while let Some(row_res) = row_stream.next().await { - let row = row_res.unwrap(); - drop(row); + drop(row_res.unwrap()); } } #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_get_role0_key_chain_root() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; let mut row_stream = GetRole0ChainRootQuery::execute(&session, GetRole0ChainRootQueryParams { role0_key: vec![], @@ -119,15 +126,16 @@ async fn test_get_role0_key_chain_root() { .unwrap(); while let Some(row_res) = row_stream.next().await { - let row = row_res.unwrap(); - drop(row); + drop(row_res.unwrap()); } } #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_get_stake_addr_w_stake_key_hash() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; let mut row_stream = GetStakeAddrQuery::execute(&session, GetStakeAddrParams { stake_hash: vec![] }) @@ -135,15 +143,16 @@ async fn test_get_stake_addr_w_stake_key_hash() { .unwrap(); while let Some(row_res) = row_stream.next().await { - let row = row_res.unwrap(); - drop(row); + drop(row_res.unwrap()); } } #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_get_stake_addr_w_vote_key() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; let mut row_stream = GetStakeAddrFromVoteKeyQuery::execute(&session, GetStakeAddrFromVoteKeyParams { @@ -153,8 +162,7 @@ async fn test_get_stake_addr_w_vote_key() { .unwrap(); while let Some(row_res) = row_stream.next().await { - let row = row_res.unwrap(); - drop(row); + drop(row_res.unwrap()); } } @@ -163,7 +171,7 @@ async fn test_get_stake_addr_w_vote_key() { // from `testunit` CI run"] #[tokio::test] // async fn test_get_sync_status() { // let (session, _) = -// get_shared_session().await.unwrap(); +// get_shared_session().await // Ok(()) // } @@ -171,7 +179,9 @@ async fn test_get_stake_addr_w_vote_key() { #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_get_txi_by_txn_hashes() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; let mut row_stream = GetTxiByTxnHashesQuery::execute(&session, GetTxiByTxnHashesQueryParams::new(vec![])) @@ -179,15 +189,16 @@ async fn test_get_txi_by_txn_hashes() { .unwrap(); while let Some(row_res) = row_stream.next().await { - let row = row_res.unwrap(); - drop(row); + drop(row_res.unwrap()); } } #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_get_txo_by_stake_address() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; let mut row_stream = GetTxoByStakeAddressQuery::execute( &session, @@ -197,15 +208,16 @@ async fn test_get_txo_by_stake_address() { .unwrap(); while let Some(row_res) = row_stream.next().await { - let row = row_res.unwrap(); - drop(row); + drop(row_res.unwrap()); } } #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_insert_sync_status() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; SyncStatusInsertQuery::execute( &session, @@ -218,7 +230,9 @@ async fn test_insert_sync_status() { #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_update_txo_spent() { - let (session, _) = get_shared_session().await.unwrap(); + let Ok((session, _)) = get_shared_session().await else { + panic!("{SESSION_ERR_MSG}"); + }; UpdateTxoSpentQuery::execute(&session, vec![]) .await diff --git a/catalyst-gateway/bin/src/db/index/tests/scylla_session.rs b/catalyst-gateway/bin/src/db/index/tests/scylla_session.rs index 3eeec951086..fff576b2e92 100644 --- a/catalyst-gateway/bin/src/db/index/tests/scylla_session.rs +++ b/catalyst-gateway/bin/src/db/index/tests/scylla_session.rs @@ -5,5 +5,7 @@ use super::*; #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn test_session() { - get_shared_session().await.unwrap(); + if let Err(err) = get_shared_session().await { + panic!("{err}"); + } } diff --git a/catalyst-gateway/tests/Earthfile b/catalyst-gateway/tests/Earthfile index fd24409a0e0..4ad00f6a912 100644 --- a/catalyst-gateway/tests/Earthfile +++ b/catalyst-gateway/tests/Earthfile @@ -80,4 +80,4 @@ test-scylla: RUN --mount=$EARTHLY_RUST_CARGO_HOME_CACHE --mount=$EARTHLY_RUST_TARGET_CACHE \ scylla --options-file /etc/scylla/scylla.yaml --smp=2 --memory=4G --overprovisioned --developer-mode=1 & \ - cargo nextest run --release --run-ignored=only scylla_session scylla_queries + cargo nextest run --release --run-ignored=only scylla_session scylla_queries scylla_purge --no-fail-fast