Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(cat-gateway): Add purge queries to Scylla integration test #1579

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions catalyst-gateway/bin/src/cardano/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ fn sync_subchain(params: SyncParams) -> tokio::task::JoinHandle<SyncParams> {
params.backoff().await;

// Wait for indexing DB to be ready before continuing.
CassandraSession::wait_is_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
info!(chain=%params.chain, params=%params,"Indexing DB is ready");

let mut first_indexed_block = params.first_indexed_block.clone();
Expand Down Expand Up @@ -377,7 +377,7 @@ impl SyncTask {
// Wait for indexing DB to be ready before continuing.
// We do this after the above, because other nodes may have finished already, and we don't
// want to wait do any work they already completed while we were fetching the blockchain.
CassandraSession::wait_is_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state");
self.sync_status = get_sync_status().await;
debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,17 @@ pub(crate) struct PrimaryKeyQuery;
impl PrimaryKeyQuery {
/// Prepares a query to get all Chain Root For Role0 Key registration primary keys.
pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
let select_primary_key = PreparedQueries::prepare(
PreparedQueries::prepare(
session.clone(),
SELECT_QUERY,
scylla::statement::Consistency::All,
true,
)
.await;

if let Err(ref error) = select_primary_key {
error!(error=%error, "Failed to prepare get Chain Root For Role0 Key registration primary key query");
};

select_primary_key
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare get Chain Root For Role0 Key registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
}

/// Executes a query to get all Chain Root For Role0 Key registration primary keys.
Expand All @@ -103,16 +101,19 @@ impl DeleteQuery {
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<SizedBatch> {
let delete_queries = PreparedQueries::prepare_batch(
PreparedQueries::prepare_batch(
session.clone(),
DELETE_QUERY,
cfg,
scylla::statement::Consistency::Any,
true,
false,
)
.await?;
Ok(delete_queries)
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare delete Chain Root For Role0 Key registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
}

/// Executes a DELETE Query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,17 @@ impl PrimaryKeyQuery {
/// Prepares a query to get all Chain Root For Stake Address registration primary
/// keys.
pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
let select_primary_key = PreparedQueries::prepare(
PreparedQueries::prepare(
session.clone(),
SELECT_QUERY,
scylla::statement::Consistency::All,
true,
)
.await;

if let Err(ref error) = select_primary_key {
error!(error=%error, "Failed to prepare get Chain Root For Stake Address registration primary key query");
};

select_primary_key
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare get Chain Root For Stake Address registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
}

/// Executes a query to get all Chain Root For Stake Address registration primary
Expand Down Expand Up @@ -105,16 +103,19 @@ impl DeleteQuery {
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<SizedBatch> {
let delete_queries = PreparedQueries::prepare_batch(
PreparedQueries::prepare_batch(
session.clone(),
DELETE_QUERY,
cfg,
scylla::statement::Consistency::Any,
true,
false,
)
.await?;
Ok(delete_queries)
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare delete Chain Root For Stake Address registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
}

/// Executes a DELETE Query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,17 @@ pub(crate) struct PrimaryKeyQuery;
impl PrimaryKeyQuery {
/// Prepares a query to get all Chain Root For TX ID registration primary keys.
pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
let select_primary_key = PreparedQueries::prepare(
PreparedQueries::prepare(
session.clone(),
SELECT_QUERY,
scylla::statement::Consistency::All,
true,
)
.await;

if let Err(ref error) = select_primary_key {
error!(error=%error, "Failed to prepare get Chain Root For TX ID registration primary key query");
};

select_primary_key
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare get Chain Root For TX ID registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
}

/// Executes a query to get all Chain Root For TX ID registration primary keys.
Expand All @@ -100,16 +98,19 @@ impl DeleteQuery {
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<SizedBatch> {
let delete_queries = PreparedQueries::prepare_batch(
PreparedQueries::prepare_batch(
session.clone(),
DELETE_QUERY,
cfg,
scylla::statement::Consistency::Any,
true,
false,
)
.await?;
Ok(delete_queries)
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare delete Chain Root For TX ID registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
}

/// Executes a DELETE Query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,17 @@ pub(crate) struct PrimaryKeyQuery;
impl PrimaryKeyQuery {
/// Prepares a query to get all CIP-36 registration primary keys.
pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
let select_primary_key = PreparedQueries::prepare(
PreparedQueries::prepare(
session.clone(),
SELECT_QUERY,
scylla::statement::Consistency::All,
true,
)
.await;

if let Err(ref error) = select_primary_key {
error!(error=%error, "Failed to prepare get CIP-36 registration primary key query");
};

select_primary_key
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare get CIP-36 registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
}

/// Executes a query to get all CIP-36 registration primary keys.
Expand All @@ -107,16 +105,19 @@ impl DeleteQuery {
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<SizedBatch> {
let delete_queries = PreparedQueries::prepare_batch(
PreparedQueries::prepare_batch(
session.clone(),
DELETE_QUERY,
cfg,
scylla::statement::Consistency::Any,
true,
false,
)
.await?;
Ok(delete_queries)
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare delete CIP-36 registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
}

/// Executes a DELETE Query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,17 @@ pub(crate) struct PrimaryKeyQuery;
impl PrimaryKeyQuery {
/// Prepares a query to get all CIP-36 registration by Vote key primary keys.
pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
let select_primary_key = PreparedQueries::prepare(
PreparedQueries::prepare(
session.clone(),
SELECT_QUERY,
scylla::statement::Consistency::All,
true,
)
.await;

if let Err(ref error) = select_primary_key {
error!(error=%error, "Failed to prepare get CIP-36 registration by Vote key primary key query");
};

select_primary_key
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare get CIP-36 registration by Vote key primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
}

/// Executes a query to get all CIP-36 registration by Vote key primary keys.
Expand All @@ -111,16 +109,19 @@ impl DeleteQuery {
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<SizedBatch> {
let delete_queries = PreparedQueries::prepare_batch(
PreparedQueries::prepare_batch(
session.clone(),
DELETE_QUERY,
cfg,
scylla::statement::Consistency::Any,
true,
false,
)
.await?;
Ok(delete_queries)
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare delete CIP-36 registration by Vote key primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
}

/// Executes a DELETE Query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,17 @@ pub(crate) struct PrimaryKeyQuery;
impl PrimaryKeyQuery {
/// Prepares a query to get all CIP-36 invalid registration primary keys.
pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
let select_primary_key = PreparedQueries::prepare(
PreparedQueries::prepare(
session.clone(),
SELECT_QUERY,
scylla::statement::Consistency::All,
true,
)
.await;

if let Err(ref error) = select_primary_key {
error!(error=%error, "Failed to prepare get CIP-36 invalid registration primary key query");
};

select_primary_key
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare get CIP-36 invalid registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
}

/// Executes a query to get all CIP-36 invalid registration primary keys.
Expand All @@ -103,16 +101,19 @@ impl DeleteQuery {
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<SizedBatch> {
let delete_queries = PreparedQueries::prepare_batch(
PreparedQueries::prepare_batch(
session.clone(),
DELETE_QUERY,
cfg,
scylla::statement::Consistency::Any,
true,
false,
)
.await?;
Ok(delete_queries)
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare delete CIP-36 invalid registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
}

/// Executes a DELETE Query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,17 @@ pub(crate) struct PrimaryKeyQuery;
impl PrimaryKeyQuery {
/// Prepares a query to get all RBAC 509 registration primary keys.
pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
let select_primary_key = PreparedQueries::prepare(
PreparedQueries::prepare(
session.clone(),
SELECT_QUERY,
scylla::statement::Consistency::All,
true,
)
.await;

if let Err(ref error) = select_primary_key {
error!(error=%error, "Failed to prepare get RBAC 509 registration primary key query");
};

select_primary_key
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare get RBAC 509 registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
}

/// Executes a query to get all RBAC 509 registration primary keys.
Expand All @@ -111,16 +109,19 @@ impl DeleteQuery {
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<SizedBatch> {
let delete_queries = PreparedQueries::prepare_batch(
PreparedQueries::prepare_batch(
session.clone(),
DELETE_QUERY,
cfg,
scylla::statement::Consistency::Any,
true,
false,
)
.await?;
Ok(delete_queries)
.await
.inspect_err(
|error| error!(error=%error, "Failed to prepare delete RBAC 509 registration primary key query."),
)
.map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
}

/// Executes a DELETE Query
Expand Down
Loading
Loading