diff --git a/.env.test b/.env.test index 79098ea6..c27d8d43 100644 --- a/.env.test +++ b/.env.test @@ -25,7 +25,11 @@ SQS_JOB_HANDLE_FAILURE_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.clou SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_worker_trigger_queue" ##### SNS ##### + ALERTS="sns" +AWS_SNS_REGION="us-east-1" +AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn" +AWS_SNS_ARN_NAME="madara-orchestrator-arn" ##### DATABASE ##### @@ -57,6 +61,7 @@ STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C DEFAULT_L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" TEST_DUMMY_CONTRACT_ADDRESS="0xE5b6F5e695BA6E4aeD92B68c4CC8Df1160D69A81" STARKNET_OPERATOR_ADDRESS="0x2C169DFe5fBbA12957Bdd0Ba47d9CEDbFE260CA7" +ETHEREUM_BLAST_RPC_URL="https://eth-mainnet.public.blastapi.io" ##### E2E test vars ##### diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index b841c16b..2284f89b 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -1,15 +1,10 @@ name: Rust Test & Coverage on: - pull_request_target: - branches: - - main - types: [opened, synchronize, reopened] - push: - branches-ignore: - - main workflow_call: - workflow_dispatch: + secrets: + ETHEREUM_BLAST_RPC_URL: + required: true jobs: coverage: @@ -19,7 +14,7 @@ jobs: localstack: image: localstack/localstack env: - SERVICES: s3, sqs + SERVICES: s3, sqs, sns DEFAULT_REGION: us-east-1 AWS_ACCESS_KEY_ID: "AWS_ACCESS_KEY_ID" AWS_SECRET_ACCESS_KEY: "AWS_SECRET_ACCESS_KEY" @@ -74,7 +69,7 @@ jobs: - name: Run llvm-cov tests env: ETHEREUM_BLAST_RPC_URL: ${{ secrets.ETHEREUM_BLAST_RPC_URL }} - run: cargo llvm-cov nextest --release --lcov --output-path lcov.info --test-threads=1 + run: RUST_LOG=debug RUST_BACKTRACE=1 cargo llvm-cov nextest --release --lcov --output-path lcov.info --test-threads=1 - name: Coveralls uses: coverallsapp/github-action@v2 diff --git a/.github/workflows/linters-cargo.yml b/.github/workflows/linters-cargo.yml index d3e5ff90..96172230 100644 --- a/.github/workflows/linters-cargo.yml +++ b/.github/workflows/linters-cargo.yml @@ -4,7 +4,6 @@ name: Task - Linters Cargo on: workflow_dispatch: workflow_call: - push: jobs: cargo-lint: diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index 901b7f1f..42f8c8de 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -4,7 +4,6 @@ name: Task - Linters on: workflow_dispatch: workflow_call: - push: jobs: prettier: diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 2863c998..3698d8c5 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -2,7 +2,8 @@ name: Workflow - Pull Request on: - pull_request_target: + workflow_dispatch: + pull_request: branches: [main] push: branches: [main] @@ -24,3 +25,4 @@ jobs: name: Run coverage uses: ./.github/workflows/coverage.yml needs: rust_build + secrets: inherit diff --git a/.github/workflows/rust-build.yml b/.github/workflows/rust-build.yml index 1406606b..f7cb3e08 100644 --- a/.github/workflows/rust-build.yml +++ b/.github/workflows/rust-build.yml @@ -4,7 +4,6 @@ name: Task - Build Rust on: workflow_dispatch: workflow_call: - push: jobs: rust_build: diff --git a/CHANGELOG.md b/CHANGELOG.md index c6f1f690..c79ba808 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- alerts module. - Tests for Settlement client. - Worker queues to listen for trigger events. - Tests for prover client. @@ -30,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Changed +- refactor AWS config usage and clean .env files - GitHub's coverage CI yml file for localstack and db testing. - Orchestrator :Moved TestConfigBuilder to `config.rs` in tests folder. - `.env` file requires two more variables which are queue urls for processing @@ -37,6 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Removed +- revert CI changes from settlement client PR. - `init_config` from all the tests. - `fetch_from_test` argument diff --git a/Cargo.lock b/Cargo.lock index c86a9a72..0af0b68a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1794,6 +1794,29 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-sns" +version = "1.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dab2b9787b8d9d3094ace9585e785079cfc583199ec620ab067b599e8850c1a6" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sqs" version = "1.36.0" @@ -6431,6 +6454,7 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-s3", + "aws-sdk-sns", "aws-sdk-sqs", "axum 0.7.5", "axum-macros", diff --git a/crates/da-clients/ethereum/src/config.rs b/crates/da-clients/ethereum/src/config.rs index b50604b2..34613486 100644 --- a/crates/da-clients/ethereum/src/config.rs +++ b/crates/da-clients/ethereum/src/config.rs @@ -19,14 +19,14 @@ pub struct EthereumDaConfig { impl DaConfig for EthereumDaConfig { fn new_from_env() -> Self { Self { - rpc_url: get_env_var_or_panic("ETHEREUM_RPC_URL"), + rpc_url: get_env_var_or_panic("SETTLEMENT_RPC_URL"), memory_pages_contract: get_env_var_or_panic("MEMORY_PAGES_CONTRACT_ADDRESS"), private_key: get_env_var_or_panic("PRIVATE_KEY"), } } async fn build_client(&self) -> EthereumDaClient { let client = - RpcClient::new_http(Url::from_str(self.rpc_url.as_str()).expect("Failed to parse ETHEREUM_RPC_URL")); + RpcClient::new_http(Url::from_str(self.rpc_url.as_str()).expect("Failed to parse SETTLEMENT_RPC_URL")); let provider = ProviderBuilder::<_, Ethereum>::new().on_client(client); EthereumDaClient { provider } diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 2e4fd97f..b27736a8 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -20,6 +20,7 @@ async-trait = { workspace = true } aws-config = { workspace = true } aws-sdk-s3 = { workspace = true } aws-sdk-sqs = { workspace = true } +aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] } axum = { workspace = true, features = ["macros"] } axum-macros = { workspace = true } bincode = { workspace = true } diff --git a/crates/orchestrator/src/alerts/aws_sns/mod.rs b/crates/orchestrator/src/alerts/aws_sns/mod.rs new file mode 100644 index 00000000..9aa95f9a --- /dev/null +++ b/crates/orchestrator/src/alerts/aws_sns/mod.rs @@ -0,0 +1,27 @@ +use crate::alerts::Alerts; +use async_trait::async_trait; +use aws_sdk_sns::config::Region; +use aws_sdk_sns::Client; +use utils::env_utils::get_env_var_or_panic; + +pub struct AWSSNS { + client: Client, +} + +impl AWSSNS { + /// To create a new SNS client + pub async fn new() -> Self { + let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); + let config = aws_config::from_env().region(Region::new(sns_region)).load().await; + AWSSNS { client: Client::new(&config) } + } +} + +#[async_trait] +impl Alerts for AWSSNS { + async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()> { + let topic_arn = get_env_var_or_panic("AWS_SNS_ARN"); + self.client.publish().topic_arn(topic_arn).message(message_body).send().await?; + Ok(()) + } +} diff --git a/crates/orchestrator/src/alerts/mod.rs b/crates/orchestrator/src/alerts/mod.rs new file mode 100644 index 00000000..1e36129d --- /dev/null +++ b/crates/orchestrator/src/alerts/mod.rs @@ -0,0 +1,11 @@ +use async_trait::async_trait; +use mockall::automock; + +pub mod aws_sns; + +#[automock] +#[async_trait] +pub trait Alerts: Send + Sync { + /// To send an alert message to our alert service + async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>; +} diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index ea68fc7f..427230dc 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -3,10 +3,13 @@ use alloy::providers::RootProvider; use std::str::FromStr; use std::sync::Arc; -use crate::data_storage::aws_s3::config::{AWSS3Config, AWSS3ConfigType, S3LocalStackConfig}; +use crate::alerts::aws_sns::AWSSNS; +use crate::alerts::Alerts; +use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::{DataStorage, DataStorageConfig}; use arc_swap::{ArcSwap, Guard}; +use aws_config::SdkConfig; use da_client_interface::{DaClient, DaConfig}; use dotenvy::dotenv; use ethereum_da_client::config::EthereumDaConfig; @@ -45,6 +48,8 @@ pub struct Config { queue: Box, /// Storage client storage: Box, + /// Alerts client + alerts: Box, } /// Initializes the app config @@ -57,10 +62,16 @@ pub async fn init_config() -> Config { )); // init database - let database = Box::new(MongoDb::new(MongoDbConfig::new_from_env()).await); + let database = build_database_client().await; + + // init AWS + let aws_config = aws_config::load_from_env().await; // init the queue - let queue = Box::new(SqsQueue {}); + // TODO: we use omniqueue for now which doesn't support loading AWS config + // from `SdkConfig`. We can later move to using `aws_sdk_sqs`. This would require + // us stop using the generic omniqueue abstractions for message ack/nack + let queue = build_queue_client(&aws_config); let da_client = build_da_client().await; @@ -68,13 +79,25 @@ pub async fn init_config() -> Config { let settlement_client = build_settlement_client(&settings_provider).await; let prover_client = build_prover_service(&settings_provider); - let storage_client = build_storage_client().await; + let storage_client = build_storage_client(&aws_config).await; + + let alerts_client = build_alert_client().await; - Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue, storage_client) + Config::new( + Arc::new(provider), + da_client, + prover_client, + settlement_client, + database, + queue, + storage_client, + alerts_client, + ) } impl Config { /// Create a new config + #[allow(clippy::too_many_arguments)] pub fn new( starknet_client: Arc>, da_client: Box, @@ -83,8 +106,9 @@ impl Config { database: Box, queue: Box, storage: Box, + alerts: Box, ) -> Self { - Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage } + Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage, alerts } } /// Returns the starknet client @@ -121,6 +145,11 @@ impl Config { pub fn storage(&self) -> &dyn DataStorage { self.storage.as_ref() } + + /// Returns the alerts client + pub fn alerts(&self) -> &dyn Alerts { + self.alerts.as_ref() + } } /// The app config. It can be accessed from anywhere inside the service. @@ -186,12 +215,29 @@ pub async fn build_settlement_client( } } -pub async fn build_storage_client() -> Box { +pub async fn build_storage_client(aws_config: &SdkConfig) -> Box { match get_env_var_or_panic("DATA_STORAGE").as_str() { - "s3" => Box::new(AWSS3::new(AWSS3ConfigType::WithoutEndpoint(AWSS3Config::new_from_env())).await), - "s3_localstack" => { - Box::new(AWSS3::new(AWSS3ConfigType::WithEndpoint(S3LocalStackConfig::new_from_env())).await) - } + "s3" => Box::new(AWSS3::new(AWSS3Config::new_from_env(), aws_config)), _ => panic!("Unsupported Storage Client"), } } + +pub async fn build_alert_client() -> Box { + match get_env_var_or_panic("ALERTS").as_str() { + "sns" => Box::new(AWSSNS::new().await), + _ => panic!("Unsupported Alert Client"), + } +} +pub fn build_queue_client(_aws_config: &SdkConfig) -> Box { + match get_env_var_or_panic("QUEUE_PROVIDER").as_str() { + "sqs" => Box::new(SqsQueue {}), + _ => panic!("Unsupported Queue Client"), + } +} + +pub async fn build_database_client() -> Box { + match get_env_var_or_panic("DATABASE").as_str() { + "mongodb" => Box::new(MongoDb::new(MongoDbConfig::new_from_env()).await), + _ => panic!("Unsupported Database Client"), + } +} diff --git a/crates/orchestrator/src/data_storage/aws_s3/config.rs b/crates/orchestrator/src/data_storage/aws_s3/config.rs index 0970d80d..665caeb4 100644 --- a/crates/orchestrator/src/data_storage/aws_s3/config.rs +++ b/crates/orchestrator/src/data_storage/aws_s3/config.rs @@ -2,64 +2,17 @@ use utils::env_utils::get_env_var_or_panic; use crate::data_storage::DataStorageConfig; -/// Represents the type of the config which one wants to pass to create the client -#[derive(Clone)] -pub enum AWSS3ConfigType { - WithEndpoint(S3LocalStackConfig), - WithoutEndpoint(AWSS3Config), -} - /// Represents AWS S3 config struct with all the necessary variables. #[derive(Clone)] pub struct AWSS3Config { - /// AWS ACCESS KEY ID - pub s3_key_id: String, - /// AWS ACCESS KEY SECRET - pub s3_key_secret: String, /// S3 Bucket Name - pub s3_bucket_name: String, - /// S3 Bucket region - pub s3_bucket_region: String, -} - -/// Represents AWS S3 config struct with all the necessary variables. -#[derive(Clone)] -pub struct S3LocalStackConfig { - /// AWS ACCESS KEY ID - pub s3_key_id: String, - /// AWS ACCESS KEY SECRET - pub s3_key_secret: String, - /// S3 Bucket Name - pub s3_bucket_name: String, - /// S3 Bucket region - pub s3_bucket_region: String, - /// Endpoint url - pub endpoint_url: String, + pub bucket_name: String, } /// Implementation of `DataStorageConfig` for `AWSS3Config` impl DataStorageConfig for AWSS3Config { /// To return the config struct by creating it from the environment variables. fn new_from_env() -> Self { - Self { - s3_key_id: get_env_var_or_panic("AWS_ACCESS_KEY_ID"), - s3_key_secret: get_env_var_or_panic("AWS_SECRET_ACCESS_KEY"), - s3_bucket_name: get_env_var_or_panic("AWS_S3_BUCKET_NAME"), - s3_bucket_region: get_env_var_or_panic("AWS_S3_BUCKET_REGION"), - } - } -} - -/// Implementation of `DataStorageConfig` for `S3LocalStackConfig` -impl DataStorageConfig for S3LocalStackConfig { - /// To return the config struct by creating it from the environment variables. - fn new_from_env() -> Self { - Self { - s3_key_id: get_env_var_or_panic("AWS_ACCESS_KEY_ID"), - s3_key_secret: get_env_var_or_panic("AWS_SECRET_ACCESS_KEY"), - s3_bucket_name: get_env_var_or_panic("AWS_S3_BUCKET_NAME"), - s3_bucket_region: get_env_var_or_panic("AWS_S3_BUCKET_REGION"), - endpoint_url: get_env_var_or_panic("AWS_ENDPOINT_URL"), - } + Self { bucket_name: get_env_var_or_panic("AWS_S3_BUCKET_NAME") } } } diff --git a/crates/orchestrator/src/data_storage/aws_s3/mod.rs b/crates/orchestrator/src/data_storage/aws_s3/mod.rs index 1be1f711..3578b7c8 100644 --- a/crates/orchestrator/src/data_storage/aws_s3/mod.rs +++ b/crates/orchestrator/src/data_storage/aws_s3/mod.rs @@ -1,7 +1,7 @@ -use crate::data_storage::aws_s3::config::AWSS3ConfigType; +use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::DataStorage; use async_trait::async_trait; -use aws_sdk_s3::config::{Builder, Credentials, Region}; +use aws_config::SdkConfig; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::Client; use bytes::Bytes; @@ -13,7 +13,7 @@ pub mod config; /// AWSS3 represents AWS S3 client object containing the client and the config itself. pub struct AWSS3 { client: Client, - config: AWSS3ConfigType, + bucket: String, } /// Implementation for AWS S3 client. Contains the function for : @@ -22,63 +22,17 @@ pub struct AWSS3 { impl AWSS3 { /// Initializes a new AWS S3 client by passing the config /// and returning it. - pub async fn new(config: AWSS3ConfigType) -> Self { - let (config_builder, config) = match config { - AWSS3ConfigType::WithoutEndpoint(config) => { - let (credentials, region) = get_credentials_and_region_from_config( - config.s3_key_id.clone(), - config.s3_key_secret.clone(), - config.s3_bucket_region.clone(), - ); - ( - Builder::new().region(region).credentials_provider(credentials).force_path_style(true), - AWSS3ConfigType::WithoutEndpoint(config), - ) - } - AWSS3ConfigType::WithEndpoint(config) => { - let (credentials, region) = get_credentials_and_region_from_config( - config.s3_key_id.clone(), - config.s3_key_secret.clone(), - config.s3_bucket_region.clone(), - ); - ( - Builder::new() - .region(region) - .credentials_provider(credentials) - .force_path_style(true) - .endpoint_url(config.endpoint_url.clone()), - AWSS3ConfigType::WithEndpoint(config), - ) - } - }; - - let conf = config_builder.build(); - + pub fn new(s3_config: AWSS3Config, aws_config: &SdkConfig) -> Self { // Building AWS S3 config - let client = Client::from_conf(conf); - - Self { client, config } - } + let mut s3_config_builder = aws_sdk_s3::config::Builder::from(aws_config); - pub fn get_bucket_name(&self) -> String { - match self.config.clone() { - AWSS3ConfigType::WithEndpoint(config) => config.s3_bucket_name, - AWSS3ConfigType::WithoutEndpoint(config) => config.s3_bucket_name, - } + // this is necessary for it to work with localstack in test cases + s3_config_builder.set_force_path_style(Some(true)); + let client = Client::from_conf(s3_config_builder.build()); + Self { client, bucket: s3_config.bucket_name } } } -/// Return the constructed `Credentials` and `Region` -fn get_credentials_and_region_from_config( - s3_key_id: String, - s3_key_secret: String, - s3_bucket_region: String, -) -> (Credentials, Region) { - let credentials = Credentials::new(s3_key_id, s3_key_secret, None, None, "loaded_from_custom_env"); - let region = Region::new(s3_bucket_region); - (credentials, region) -} - /// Implementation of `DataStorage` for `AWSS3` /// contains the function for getting the data and putting the data /// by taking the key as an argument. @@ -86,7 +40,7 @@ fn get_credentials_and_region_from_config( impl DataStorage for AWSS3 { /// Function to get the data from S3 bucket by Key. async fn get_data(&self, key: &str) -> Result { - let response = self.client.get_object().bucket(self.get_bucket_name()).key(key).send().await?; + let response = self.client.get_object().bucket(&self.bucket).key(key).send().await?; let data_stream = response.body.collect().await.expect("Failed to convert body into AggregatedBytes."); let data_bytes = data_stream.into_bytes(); Ok(data_bytes) @@ -96,7 +50,7 @@ impl DataStorage for AWSS3 { async fn put_data(&self, data: Bytes, key: &str) -> Result<()> { self.client .put_object() - .bucket(self.get_bucket_name()) + .bucket(&self.bucket) .key(key) .body(ByteStream::from(data)) .content_type("application/json") diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index e4ae1ed6..11da33d8 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -139,7 +139,8 @@ pub async fn create_job( } let job_handler = factory::get_job_handler(&job_type).await; - let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?; + let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await?; + config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; add_job_to_process_queue(job_item.id).await.map_err(|e| JobError::Other(OtherError(e)))?; @@ -173,6 +174,7 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { let job_handler = factory::get_job_handler(&job.job_type).await; let external_id = job_handler.process_job(config.as_ref(), &mut job).await?; + let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; // Fetching the job again because update status above will update the job version @@ -239,8 +241,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { ); add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?; return Ok(()); - } else { - // TODO: send alert } } JobVerificationStatus::Pending => { @@ -248,7 +248,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY) .map_err(|e| JobError::Other(OtherError(e)))?; if verify_attempts >= job_handler.max_verification_attempts() { - // TODO: send alert log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id); config .database() diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index 3d02378b..4212f381 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -1,3 +1,5 @@ +/// Contains the trait implementations for alerts +pub mod alerts; /// Config of the service. Contains configurations for DB, Queues and other services. pub mod config; pub mod constants; diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 5defa8f9..9144303a 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -195,6 +195,13 @@ where } Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); + config() + .await + .alerts() + .send_alert_message(e.to_string()) + .await + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + match message.nack().await { Ok(_) => Err(ConsumptionError::FailedToHandleJob { job_id: job_message.id, @@ -233,6 +240,13 @@ where } Err(e) => { log::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e); + config() + .await + .alerts() + .send_alert_message(e.to_string()) + .await + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + message.nack().await.map_err(|(e, _)| ConsumptionError::Other(OtherError::from(e.to_string())))?; Err(ConsumptionError::FailedToSpawnWorker { worker_trigger_type: job_message.worker, diff --git a/crates/orchestrator/src/tests/alerts/mod.rs b/crates/orchestrator/src/tests/alerts/mod.rs new file mode 100644 index 00000000..2026061b --- /dev/null +++ b/crates/orchestrator/src/tests/alerts/mod.rs @@ -0,0 +1,61 @@ +use crate::config::config; +use crate::tests::common::{get_sns_client, get_sqs_client}; +use crate::tests::config::TestConfigBuilder; +use aws_sdk_sqs::types::QueueAttributeName::QueueArn; +use rstest::rstest; +use std::time::Duration; +use tokio::time::sleep; +use utils::env_utils::get_env_var_or_panic; + +pub const SNS_ALERT_TEST_QUEUE: &str = "orchestrator_sns_alert_testing_queue"; + +#[rstest] +#[tokio::test] +async fn sns_alert_subscribe_to_topic_receive_alert_works() { + TestConfigBuilder::new().build().await; + + let sqs_client = get_sqs_client().await; + let queue = sqs_client.create_queue().queue_name(SNS_ALERT_TEST_QUEUE).send().await.unwrap(); + let queue_url = queue.queue_url().unwrap(); + + let sns_client = get_sns_client().await; + let config = config().await; + + let queue_attributes = + sqs_client.get_queue_attributes().queue_url(queue_url).attribute_names(QueueArn).send().await.unwrap(); + + let queue_arn = queue_attributes.attributes().unwrap().get(&QueueArn).unwrap(); + + // subscribing the queue with the alerts + sns_client + .subscribe() + .topic_arn(get_env_var_or_panic("AWS_SNS_ARN").as_str()) + .protocol("sqs") + .endpoint(queue_arn) + .send() + .await + .unwrap(); + + let message_to_send = "Hello World :)"; + + // Getting sns client from the module + let alerts_client = config.alerts(); + // Sending the alert message + alerts_client.send_alert_message(message_to_send.to_string()).await.unwrap(); + + sleep(Duration::from_secs(5)).await; + + // Checking the queue for message + let receive_message_result = sqs_client + .receive_message() + .queue_url(queue_url) + .max_number_of_messages(1) + .send() + .await + .unwrap() + .messages + .unwrap(); + + assert_eq!(receive_message_result.len(), 1, "Alert message length assertion failed"); + assert!(receive_message_result[0].body.clone().unwrap().contains(message_to_send)); +} diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 64dfde1b..1fa5d1c2 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -4,11 +4,14 @@ use std::collections::HashMap; use ::uuid::Uuid; use aws_config::Region; +use aws_sdk_sns::error::SdkError; +use aws_sdk_sns::operation::create_topic::CreateTopicError; use mongodb::Client; use rstest::*; use serde::Deserialize; +use utils::env_utils::get_env_var_or_panic; -use crate::data_storage::aws_s3::config::{AWSS3ConfigType, S3LocalStackConfig}; +use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::{DataStorage, DataStorageConfig}; use crate::database::mongodb::config::MongoDbConfig; @@ -40,6 +43,18 @@ pub fn custom_job_item(default_job_item: JobItem, #[default(String::from("0"))] job_item } +pub async fn create_sns_arn() -> Result<(), SdkError> { + let sns_client = get_sns_client().await; + sns_client.create_topic().name(get_env_var_or_panic("AWS_SNS_ARN_NAME")).send().await?; + Ok(()) +} + +pub async fn get_sns_client() -> aws_sdk_sns::client::Client { + let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); + let config = aws_config::from_env().region(Region::new(sns_region)).load().await; + aws_sdk_sns::Client::new(&config) +} + pub async fn drop_database() -> color_eyre::Result<()> { let db_client: Client = MongoDb::new(MongoDbConfig::new_from_env()).await.client(); // dropping all the collection. @@ -71,7 +86,7 @@ pub async fn create_sqs_queues() -> color_eyre::Result<()> { Ok(()) } -async fn get_sqs_client() -> aws_sdk_sqs::Client { +pub async fn get_sqs_client() -> aws_sdk_sqs::Client { // This function is for localstack. So we can hardcode the region for this as of now. let region_provider = Region::new("us-east-1"); let config = aws_config::from_env().region(region_provider).load().await; @@ -84,5 +99,7 @@ pub struct MessagePayloadType { } pub async fn get_storage_client() -> Box { - Box::new(AWSS3::new(AWSS3ConfigType::WithEndpoint(S3LocalStackConfig::new_from_env())).await) + let aws_config = + aws_config::load_from_env().await.into_builder().endpoint_url(get_env_var_or_panic("AWS_ENDPOINT_URL")).build(); + Box::new(AWSS3::new(AWSS3Config::new_from_env(), &aws_config)) } diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index 3c9e4ad6..0cd182e5 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -1,10 +1,13 @@ use std::sync::Arc; -use crate::config::{build_da_client, build_prover_service, build_settlement_client, config_force_init, Config}; +use crate::config::{ + build_alert_client, build_da_client, build_prover_service, build_settlement_client, config_force_init, Config, +}; use crate::data_storage::DataStorage; use da_client_interface::DaClient; use httpmock::MockServer; +use crate::alerts::Alerts; use prover_client_interface::ProverClient; use settlement_client_interface::SettlementClient; use starknet::providers::jsonrpc::HttpTransport; @@ -17,7 +20,7 @@ use crate::database::mongodb::MongoDb; use crate::database::{Database, DatabaseConfig}; use crate::queue::sqs::SqsQueue; use crate::queue::QueueProvider; -use crate::tests::common::{create_sqs_queues, drop_database, get_storage_client}; +use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database, get_storage_client}; // Inspiration : https://rust-unofficial.github.io/patterns/patterns/creational/builder.html // TestConfigBuilder allows to heavily customise the global configs based on the test's requirement. @@ -39,6 +42,8 @@ pub struct TestConfigBuilder { queue: Option>, /// Storage client storage: Option>, + /// Alerts client + alerts: Option>, } impl Default for TestConfigBuilder { @@ -58,6 +63,7 @@ impl TestConfigBuilder { database: None, queue: None, storage: None, + alerts: None, } } @@ -96,6 +102,11 @@ impl TestConfigBuilder { self } + pub fn mock_alerts(mut self, alerts: Box) -> TestConfigBuilder { + self.alerts = Some(alerts); + self + } + pub async fn build(mut self) -> MockServer { dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); @@ -132,10 +143,16 @@ impl TestConfigBuilder { } } + if self.alerts.is_none() { + self.alerts = Some(build_alert_client().await); + } + // Deleting and Creating the queues in sqs. create_sqs_queues().await.expect("Not able to delete and create the queues."); // Deleting the database drop_database().await.expect("Unable to drop the database."); + // Creating the SNS ARN + create_sns_arn().await.expect("Unable to create the sns arn"); let config = Config::new( self.starknet_client.unwrap_or_else(|| { @@ -150,6 +167,7 @@ impl TestConfigBuilder { self.database.unwrap(), self.queue.unwrap_or_else(|| Box::new(SqsQueue {})), self.storage.unwrap(), + self.alerts.unwrap(), ); config_force_init(config).await; diff --git a/crates/orchestrator/src/tests/data_storage/mod.rs b/crates/orchestrator/src/tests/data_storage/mod.rs index d127917a..a3055acb 100644 --- a/crates/orchestrator/src/tests/data_storage/mod.rs +++ b/crates/orchestrator/src/tests/data_storage/mod.rs @@ -1,7 +1,6 @@ -use crate::data_storage::aws_s3::config::{AWSS3ConfigType, S3LocalStackConfig}; +use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::{DataStorage, DataStorageConfig}; -use crate::tests::config::TestConfigBuilder; use bytes::Bytes; use rstest::rstest; use serde_json::json; @@ -14,12 +13,12 @@ use utils::env_utils::get_env_var_or_panic; #[rstest] #[tokio::test] async fn test_put_and_get_data_s3() -> color_eyre::Result<()> { - TestConfigBuilder::new().build().await; - dotenvy::from_filename("../.env.test")?; - let config = S3LocalStackConfig::new_from_env(); - let s3_client = AWSS3::new(AWSS3ConfigType::WithEndpoint(config)).await; + let config = AWSS3Config::new_from_env(); + let aws_config = + aws_config::load_from_env().await.into_builder().endpoint_url(get_env_var_or_panic("AWS_ENDPOINT_URL")).build(); + let s3_client = AWSS3::new(config, &aws_config); s3_client.build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(); let mock_data = json!( diff --git a/crates/orchestrator/src/tests/mod.rs b/crates/orchestrator/src/tests/mod.rs index 1dbc21a2..4f264304 100644 --- a/crates/orchestrator/src/tests/mod.rs +++ b/crates/orchestrator/src/tests/mod.rs @@ -7,6 +7,7 @@ pub mod server; pub mod queue; +pub mod alerts; pub mod common; mod data_storage; pub mod workers; diff --git a/crates/settlement-clients/ethereum/src/config.rs b/crates/settlement-clients/ethereum/src/config.rs index 294c2a54..3d038390 100644 --- a/crates/settlement-clients/ethereum/src/config.rs +++ b/crates/settlement-clients/ethereum/src/config.rs @@ -6,8 +6,8 @@ use url::Url; use utils::env_utils::get_env_var_or_panic; pub const ENV_CORE_CONTRACT_ADDRESS: &str = "STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS"; -pub const DEFAULT_SETTLEMENT_CLIENT_RPC: &str = "DEFAULT_SETTLEMENT_CLIENT_RPC"; -pub const DEFAULT_L1_CORE_CONTRACT_ADDRESS: &str = "DEFAULT_L1_CORE_CONTRACT_ADDRESS"; +pub const SETTLEMENT_RPC_URL: &str = "SETTLEMENT_RPC_URL"; +pub const L1_CORE_CONTRACT_ADDRESS: &str = "L1_CORE_CONTRACT_ADDRESS"; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EthereumSettlementConfig { @@ -17,9 +17,8 @@ pub struct EthereumSettlementConfig { impl SettlementConfig for EthereumSettlementConfig { fn new_from_env() -> Self { - let rpc_url = get_env_var_or_panic(DEFAULT_SETTLEMENT_CLIENT_RPC); - let rpc_url = - Url::from_str(&rpc_url).unwrap_or_else(|_| panic!("Failed to parse {}", DEFAULT_SETTLEMENT_CLIENT_RPC)); + let rpc_url = get_env_var_or_panic(SETTLEMENT_RPC_URL); + let rpc_url = Url::from_str(&rpc_url).unwrap_or_else(|_| panic!("Failed to parse {}", SETTLEMENT_RPC_URL)); let core_contract_address = get_env_var_or_panic(ENV_CORE_CONTRACT_ADDRESS); Self { rpc_url, core_contract_address } } @@ -28,8 +27,8 @@ impl SettlementConfig for EthereumSettlementConfig { impl Default for EthereumSettlementConfig { fn default() -> Self { Self { - rpc_url: get_env_var_or_panic(DEFAULT_SETTLEMENT_CLIENT_RPC).parse().unwrap(), - core_contract_address: get_env_var_or_panic(DEFAULT_L1_CORE_CONTRACT_ADDRESS), + rpc_url: get_env_var_or_panic(SETTLEMENT_RPC_URL).parse().unwrap(), + core_contract_address: get_env_var_or_panic(L1_CORE_CONTRACT_ADDRESS), } } } diff --git a/crates/settlement-clients/ethereum/src/tests/mod.rs b/crates/settlement-clients/ethereum/src/tests/mod.rs index 65694e46..078cbe53 100644 --- a/crates/settlement-clients/ethereum/src/tests/mod.rs +++ b/crates/settlement-clients/ethereum/src/tests/mod.rs @@ -228,7 +228,7 @@ async fn update_state_blob_with_impersonation_works(#[case] fork_block_no: u64) // Asserting, Expected to receive transaction hash. assert!(!update_state_result.is_empty(), "No transaction Hash received."); - sleep(Duration::from_secs(2)).await; + sleep(Duration::from_secs(5)).await; ethereum_settlement_client .wait_for_tx_finality(update_state_result.as_str()) .await