diff --git a/Cargo.lock b/Cargo.lock index 2b9c25ae..e895e6d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9679,6 +9679,8 @@ dependencies = [ name = "starknet-settlement-client" version = "0.1.0" dependencies = [ + "alloy 0.2.1", + "alloy-primitives 0.7.7", "appchain-core-contract-client", "async-std", "async-trait", @@ -10875,7 +10877,6 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" name = "utils" version = "0.1.0" dependencies = [ - "clap", "color-eyre", "opentelemetry", "opentelemetry-appender-tracing", diff --git a/Cargo.toml b/Cargo.toml index 9f44c13b..ab790150 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,8 @@ rstest = "0.22.0" serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" starknet = "0.11.0" +strum = "0.26.0" +strum_macros = "0.26.0" tempfile = "3.12.0" thiserror = "1.0.57" tokio = { version = "1.37.0" } diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 0994416c..fb15e1a1 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -63,8 +63,8 @@ starknet = { workspace = true } starknet-core = "0.9.0" starknet-os = { workspace = true } starknet-settlement-client = { workspace = true } -strum = "0.26.0" -strum_macros = "0.26.0" +strum = { workspace = true } +strum_macros = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] } diff --git a/crates/orchestrator/src/cli/cron/event_bridge.rs b/crates/orchestrator/src/cli/cron/event_bridge.rs index 8719a597..321eab9b 100644 --- a/crates/orchestrator/src/cli/cron/event_bridge.rs +++ b/crates/orchestrator/src/cli/cron/event_bridge.rs @@ -9,14 +9,13 @@ pub struct AWSEventBridgeCliArgs { pub aws_event_bridge: bool, /// The name of the S3 bucket. - #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TARGET_QUEUE_NAME", long, default_value = Some("madara-orchestrator-event-bridge-target-queue-name"))] + #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TARGET_QUEUE_NAME", long, default_value = Some("madara-orchestrator-event-bridge-target-queue-name"), help = "The name of the SNS queue to send messages to from the event bridge.")] pub target_queue_name: Option, - /// The cron time for the event bridge trigger rule. - #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_CRON_TIME", long, default_value = Some("10"))] + #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_CRON_TIME", long, default_value = Some("10"), help = "The cron time for the event bridge trigger rule. Defaults to 10 seconds.")] pub cron_time: Option, /// The name of the event bridge trigger rule. - #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME", long, default_value = Some("madara-orchestrator-event-bridge-trigger-rule-name"))] + #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME", long, default_value = Some("madara-orchestrator-event-bridge-trigger-rule-name"), help = "The name of the event bridge trigger rule.")] pub trigger_rule_name: Option, } diff --git a/crates/orchestrator/src/cli/mod.rs b/crates/orchestrator/src/cli/mod.rs index edef75e0..886738c3 100644 --- a/crates/orchestrator/src/cli/mod.rs +++ b/crates/orchestrator/src/cli/mod.rs @@ -1,6 +1,8 @@ +use std::str::FromStr as _; use std::time::Duration; use alert::AlertValidatedArgs; +use alloy::primitives::Address; use clap::{ArgGroup, Parser, Subcommand}; use cron::event_bridge::AWSEventBridgeCliArgs; use cron::CronValidatedArgs; @@ -179,7 +181,7 @@ impl RunCmd { } pub fn validate_alert_params(&self) -> Result { - if self.aws_sns_args.aws_sns { + if self.aws_sns_args.aws_sns && self.aws_config_args.aws { Ok(AlertValidatedArgs::AWSSNS(AWSSNSValidatedArgs { topic_arn: self.aws_sns_args.sns_arn.clone().expect("SNS ARN is required"), })) @@ -189,9 +191,12 @@ impl RunCmd { } pub fn validate_queue_params(&self) -> Result { - if self.aws_sqs_args.aws_sqs { + if self.aws_sqs_args.aws_sqs && self.aws_config_args.aws { Ok(QueueValidatedArgs::AWSSQS(AWSSQSValidatedArgs { - queue_base_url: self.aws_sqs_args.queue_base_url.clone().expect("Queue base URL is required"), + queue_base_url: Url::parse( + &self.aws_sqs_args.queue_base_url.clone().expect("Queue base URL is required"), + ) + .expect("Invalid queue base URL"), sqs_prefix: self.aws_sqs_args.sqs_prefix.clone().expect("SQS prefix is required"), sqs_suffix: self.aws_sqs_args.sqs_suffix.clone().expect("SQS suffix is required"), })) @@ -201,7 +206,7 @@ impl RunCmd { } pub fn validate_storage_params(&self) -> Result { - if self.aws_s3_args.aws_s3 { + if self.aws_s3_args.aws_s3 && self.aws_config_args.aws { Ok(StorageValidatedArgs::AWSS3(AWSS3ValidatedArgs { bucket_name: self.aws_s3_args.bucket_name.clone().expect("Bucket name is required"), })) @@ -213,11 +218,10 @@ impl RunCmd { pub fn validate_database_params(&self) -> Result { if self.mongodb_args.mongodb { Ok(DatabaseValidatedArgs::MongoDB(MongoDBValidatedArgs { - connection_url: self - .mongodb_args - .mongodb_connection_url - .clone() - .expect("MongoDB connection URL is required"), + connection_url: Url::parse( + &self.mongodb_args.mongodb_connection_url.clone().expect("MongoDB connection URL is required"), + ) + .expect("Invalid MongoDB connection URL"), database_name: self .mongodb_args .mongodb_database_name @@ -244,67 +248,55 @@ impl RunCmd { } pub fn validate_settlement_params(&self) -> Result { - match (self.ethereum_args.settle_on_ethereum, self.starknet_args.settle_on_starknet) { - (true, false) => { - let ethereum_params = EthereumSettlementValidatedArgs { - ethereum_rpc_url: self - .ethereum_args - .ethereum_rpc_url - .clone() - .expect("Ethereum RPC URL is required"), - ethereum_private_key: self - .ethereum_args - .ethereum_private_key - .clone() - .expect("Ethereum private key is required"), - l1_core_contract_address: self - .ethereum_args - .l1_core_contract_address - .clone() - .expect("L1 core contract address is required"), - starknet_operator_address: self - .ethereum_args - .starknet_operator_address - .clone() - .expect("Starknet operator address is required"), - }; - Ok(SettlementValidatedArgs::Ethereum(ethereum_params)) - } - (false, true) => { - let starknet_params = StarknetSettlementValidatedArgs { - starknet_rpc_url: self - .starknet_args - .starknet_rpc_url - .clone() - .expect("Starknet RPC URL is required"), - starknet_private_key: self - .starknet_args - .starknet_private_key - .clone() - .expect("Starknet private key is required"), - starknet_account_address: self - .starknet_args - .starknet_account_address - .clone() - .expect("Starknet account address is required"), - starknet_cairo_core_contract_address: self + if self.ethereum_args.settle_on_ethereum { + let l1_core_contract_address = Address::from_str( + &self.ethereum_args.l1_core_contract_address.clone().expect("L1 core contract address is required"), + ) + .expect("Invalid L1 core contract address"); + let starknet_operator_address = Address::from_str( + &self.ethereum_args.starknet_operator_address.clone().expect("Starknet operator address is required"), + ) + .expect("Invalid Starknet operator address"); + + let ethereum_params = EthereumSettlementValidatedArgs { + ethereum_rpc_url: self.ethereum_args.ethereum_rpc_url.clone().expect("Ethereum RPC URL is required"), + ethereum_private_key: self + .ethereum_args + .ethereum_private_key + .clone() + .expect("Ethereum private key is required"), + l1_core_contract_address, + starknet_operator_address, + }; + Ok(SettlementValidatedArgs::Ethereum(ethereum_params)) + } else if self.starknet_args.settle_on_starknet { + let starknet_params = StarknetSettlementValidatedArgs { + starknet_rpc_url: self.starknet_args.starknet_rpc_url.clone().expect("Starknet RPC URL is required"), + starknet_private_key: self + .starknet_args + .starknet_private_key + .clone() + .expect("Starknet private key is required"), + starknet_account_address: Address::from_str( + &self.starknet_args.starknet_account_address.clone().expect("Starknet account address is required"), + ) + .expect("Invalid Starknet account address"), + starknet_cairo_core_contract_address: Address::from_str( + &self .starknet_args .starknet_cairo_core_contract_address .clone() .expect("Starknet Cairo core contract address is required"), - starknet_finality_retry_wait_in_secs: self - .starknet_args - .starknet_finality_retry_wait_in_secs - .expect("Starknet finality retry wait in seconds is required"), - madara_binary_path: self - .starknet_args - .starknet_madara_binary_path - .clone() - .expect("Starknet Madara binary path is required"), - }; - Ok(SettlementValidatedArgs::Starknet(starknet_params)) - } - (true, true) | (false, false) => Err("Exactly one settlement layer must be selected".to_string()), + ) + .expect("Invalid Starknet Cairo core contract address"), + starknet_finality_retry_wait_in_secs: self + .starknet_args + .starknet_finality_retry_wait_in_secs + .expect("Starknet finality retry wait in seconds is required"), + }; + Ok(SettlementValidatedArgs::Starknet(starknet_params)) + } else { + Err("Settlement layer is required".to_string()) } } @@ -339,7 +331,7 @@ impl RunCmd { .instrumentation_args .otel_service_name .clone() - .expect("OTel service name is required"), + .expect("Otel service name is required"), otel_collector_endpoint: self.instrumentation_args.otel_collector_endpoint.clone(), log_level: self.instrumentation_args.log_level, }) @@ -440,7 +432,7 @@ impl SetupCmd { } pub fn validate_alert_params(&self) -> Result { - if self.aws_sns_args.aws_sns { + if self.aws_sns_args.aws_sns && self.aws_config_args.aws { Ok(AlertValidatedArgs::AWSSNS(AWSSNSValidatedArgs { topic_arn: self.aws_sns_args.sns_arn.clone().expect("SNS ARN is required"), })) @@ -450,9 +442,12 @@ impl SetupCmd { } pub fn validate_queue_params(&self) -> Result { - if self.aws_sqs_args.aws_sqs { + if self.aws_sqs_args.aws_sqs && self.aws_config_args.aws { Ok(QueueValidatedArgs::AWSSQS(AWSSQSValidatedArgs { - queue_base_url: self.aws_sqs_args.queue_base_url.clone().expect("Queue base URL is required"), + queue_base_url: Url::parse( + &self.aws_sqs_args.queue_base_url.clone().expect("Queue base URL is required"), + ) + .expect("Invalid queue base URL"), sqs_prefix: self.aws_sqs_args.sqs_prefix.clone().expect("SQS prefix is required"), sqs_suffix: self.aws_sqs_args.sqs_suffix.clone().expect("SQS suffix is required"), })) @@ -462,7 +457,7 @@ impl SetupCmd { } pub fn validate_storage_params(&self) -> Result { - if self.aws_s3_args.aws_s3 { + if self.aws_s3_args.aws_s3 && self.aws_config_args.aws { Ok(StorageValidatedArgs::AWSS3(AWSS3ValidatedArgs { bucket_name: self.aws_s3_args.bucket_name.clone().expect("Bucket name is required"), })) @@ -472,7 +467,7 @@ impl SetupCmd { } pub fn validate_cron_params(&self) -> Result { - if self.aws_event_bridge_args.aws_event_bridge { + if self.aws_event_bridge_args.aws_event_bridge && self.aws_config_args.aws { Ok(CronValidatedArgs::AWSEventBridge(AWSEventBridgeValidatedArgs { target_queue_name: self .aws_event_bridge_args @@ -580,7 +575,6 @@ impl SetupCmd { // starknet_account_address: Some("".to_string()), // starknet_cairo_core_contract_address: Some("".to_string()), // starknet_finality_retry_wait_in_secs: Some(0), -// starknet_madara_binary_path: Some("".to_string()), // settle_on_starknet: false, // }, diff --git a/crates/orchestrator/src/cli/settlement/starknet.rs b/crates/orchestrator/src/cli/settlement/starknet.rs index 0d74ec82..9a171b22 100644 --- a/crates/orchestrator/src/cli/settlement/starknet.rs +++ b/crates/orchestrator/src/cli/settlement/starknet.rs @@ -27,8 +27,4 @@ pub struct StarknetSettlementCliArgs { /// The number of seconds to wait for finality. #[arg(env = "MADARA_ORCHESTRATOR_STARKNET_FINALITY_RETRY_WAIT_IN_SECS", long)] pub starknet_finality_retry_wait_in_secs: Option, - - /// The path to the Madara binary. - #[arg(env = "MADARA_ORCHESTRATOR_MADARA_BINARY_PATH", long)] - pub starknet_madara_binary_path: Option, } diff --git a/crates/orchestrator/src/cron/event_bridge.rs b/crates/orchestrator/src/cron/event_bridge.rs index b9019e5c..b8b5c6e3 100644 --- a/crates/orchestrator/src/cron/event_bridge.rs +++ b/crates/orchestrator/src/cron/event_bridge.rs @@ -17,13 +17,19 @@ pub struct AWSEventBridgeValidatedArgs { } pub struct AWSEventBridge { + target_queue_name: String, + cron_time: Duration, + trigger_rule_name: String, client: EventBridgeClient, queue_client: SqsClient, } impl AWSEventBridge { - pub fn new_with_args(_params: &AWSEventBridgeValidatedArgs, aws_config: &SdkConfig) -> Self { + pub fn new_with_args(params: &AWSEventBridgeValidatedArgs, aws_config: &SdkConfig) -> Self { Self { + target_queue_name: params.target_queue_name.clone(), + cron_time: params.cron_time, + trigger_rule_name: params.trigger_rule_name.clone(), client: aws_sdk_eventbridge::Client::new(aws_config), queue_client: aws_sdk_sqs::Client::new(aws_config), } @@ -33,24 +39,19 @@ impl AWSEventBridge { #[async_trait] #[allow(unreachable_patterns)] impl Cron for AWSEventBridge { - async fn create_cron(&self, cron_time: Duration, trigger_rule_name: String) -> color_eyre::Result<()> { + async fn create_cron(&self) -> color_eyre::Result<()> { self.client .put_rule() - .name(&trigger_rule_name) - .schedule_expression(duration_to_rate_string(cron_time)) + .name(&self.trigger_rule_name) + .schedule_expression(duration_to_rate_string(self.cron_time)) .state(RuleState::Enabled) .send() .await?; Ok(()) } - async fn add_cron_target_queue( - &self, - target_queue_name: String, - message: String, - trigger_rule_name: String, - ) -> color_eyre::Result<()> { - let queue_url = self.queue_client.get_queue_url().queue_name(target_queue_name).send().await?; + async fn add_cron_target_queue(&self, message: String) -> color_eyre::Result<()> { + let queue_url = self.queue_client.get_queue_url().queue_name(&self.target_queue_name).send().await?; let queue_attributes = self .queue_client @@ -67,7 +68,7 @@ impl Cron for AWSEventBridge { self.client .put_targets() - .rule(trigger_rule_name) + .rule(&self.trigger_rule_name) .targets( Target::builder() .id(uuid::Uuid::new_v4().to_string()) diff --git a/crates/orchestrator/src/cron/mod.rs b/crates/orchestrator/src/cron/mod.rs index ec1804b0..5f1ed51a 100644 --- a/crates/orchestrator/src/cron/mod.rs +++ b/crates/orchestrator/src/cron/mod.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use async_trait::async_trait; use lazy_static::lazy_static; @@ -8,36 +6,22 @@ use crate::queue::job_queue::{WorkerTriggerMessage, WorkerTriggerType}; pub mod event_bridge; lazy_static! { - pub static ref CRON_DURATION: Duration = Duration::from_mins(1); - // TODO : we can take this from clap. - pub static ref TARGET_QUEUE_NAME: String = String::from("madara_orchestrator_worker_trigger_queue"); pub static ref WORKER_TRIGGERS: Vec = vec![ WorkerTriggerType::Snos, WorkerTriggerType::Proving, WorkerTriggerType::DataSubmission, WorkerTriggerType::UpdateState ]; - pub static ref WORKER_TRIGGER_RULE_NAME: String = String::from("worker_trigger_scheduled"); } #[async_trait] pub trait Cron { - async fn create_cron(&self, cron_time: Duration, trigger_rule_name: String) -> color_eyre::Result<()>; - async fn add_cron_target_queue( - &self, - target_queue_name: String, - message: String, - trigger_rule_name: String, - ) -> color_eyre::Result<()>; + async fn create_cron(&self) -> color_eyre::Result<()>; + async fn add_cron_target_queue(&self, message: String) -> color_eyre::Result<()>; async fn setup(&self) -> color_eyre::Result<()> { - self.create_cron(*CRON_DURATION, WORKER_TRIGGER_RULE_NAME.clone()).await?; + self.create_cron().await?; for triggers in WORKER_TRIGGERS.iter() { - self.add_cron_target_queue( - TARGET_QUEUE_NAME.clone(), - get_worker_trigger_message(triggers.clone())?, - WORKER_TRIGGER_RULE_NAME.clone(), - ) - .await?; + self.add_cron_target_queue(get_worker_trigger_message(triggers.clone())?).await?; } Ok(()) } diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index da644111..4f18eae8 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -10,6 +10,7 @@ use mongodb::options::{ UpdateOptions, }; use mongodb::{bson, Client, Collection}; +use url::Url; use utils::ToDocument; use uuid::Uuid; @@ -21,7 +22,7 @@ mod utils; #[derive(Debug, Clone)] pub struct MongoDBValidatedArgs { - pub connection_url: String, + pub connection_url: Url, pub database_name: String, } diff --git a/crates/orchestrator/src/queue/mod.rs b/crates/orchestrator/src/queue/mod.rs index 3d8c7499..57fa8f83 100644 --- a/crates/orchestrator/src/queue/mod.rs +++ b/crates/orchestrator/src/queue/mod.rs @@ -9,12 +9,12 @@ use color_eyre::Result as EyreResult; use lazy_static::lazy_static; use mockall::automock; use omniqueue::{Delivery, QueueError}; -use strum_macros::Display; +use strum_macros::{Display, EnumIter}; use crate::config::Config; use crate::jobs::JobError; -#[derive(Display, Debug, Clone, PartialEq, Eq)] +#[derive(Display, Debug, Clone, PartialEq, Eq, EnumIter)] pub enum QueueType { #[strum(serialize = "snos_job_processing")] SnosJobProcessing, @@ -42,27 +42,6 @@ pub enum QueueType { WorkerTrigger, } -impl QueueType { - pub fn iter() -> impl Iterator { - [ - QueueType::SnosJobProcessing, - QueueType::SnosJobVerification, - QueueType::ProvingJobProcessing, - QueueType::ProvingJobVerification, - QueueType::ProofRegistrationJobProcessing, - QueueType::ProofRegistrationJobVerification, - QueueType::DataSubmissionJobProcessing, - QueueType::DataSubmissionJobVerification, - QueueType::UpdateStateJobProcessing, - QueueType::UpdateStateJobVerification, - QueueType::JobHandleFailure, - QueueType::WorkerTrigger, - ] - .iter() - .cloned() - } -} - #[derive(Clone)] pub struct DlqConfig { pub max_receive_count: i32, diff --git a/crates/orchestrator/src/queue/sqs/mod.rs b/crates/orchestrator/src/queue/sqs/mod.rs index 54a18e16..dbb167c5 100644 --- a/crates/orchestrator/src/queue/sqs/mod.rs +++ b/crates/orchestrator/src/queue/sqs/mod.rs @@ -10,20 +10,21 @@ use color_eyre::Result; use omniqueue::backends::{SqsBackend, SqsConfig, SqsConsumer, SqsProducer}; use omniqueue::{Delivery, QueueError}; use serde::Serialize; +use url::Url; use super::QueueType; use crate::queue::{QueueConfig, QueueProvider}; #[derive(Debug, Clone, Serialize)] pub struct AWSSQSValidatedArgs { - pub queue_base_url: String, + pub queue_base_url: Url, pub sqs_prefix: String, pub sqs_suffix: String, } pub struct SqsQueue { client: Client, - queue_base_url: String, + queue_base_url: Url, sqs_prefix: String, sqs_suffix: String, } diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 5c1a9703..54c43511 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -11,6 +11,7 @@ use chrono::{SubsecRound, Utc}; use mongodb::Client; use rstest::*; use serde::Deserialize; +use strum::IntoEnumIterator as _; use crate::cli::alert::AlertValidatedArgs; use crate::cli::database::DatabaseValidatedArgs; diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index 20bd1d06..811e5ec2 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -2,6 +2,7 @@ use std::net::SocketAddr; use std::str::FromStr as _; use std::sync::Arc; +use alloy::primitives::Address; use axum::Router; use da_client_interface::{DaClient, MockDaClient}; use ethereum_da_client::EthereumDaValidatedArgs; @@ -483,7 +484,8 @@ struct EnvParams { fn get_env_params() -> EnvParams { let db_params = DatabaseValidatedArgs::MongoDB(MongoDBValidatedArgs { - connection_url: get_env_var_or_panic("MADARA_ORCHESTRATOR_MONGODB_CONNECTION_URL"), + connection_url: Url::parse(&get_env_var_or_panic("MADARA_ORCHESTRATOR_MONGODB_CONNECTION_URL")) + .expect("Invalid MongoDB connection URL"), database_name: get_env_var_or_panic("MADARA_ORCHESTRATOR_DATABASE_NAME"), }); @@ -492,7 +494,8 @@ fn get_env_params() -> EnvParams { }); let queue_params = QueueValidatedArgs::AWSSQS(AWSSQSValidatedArgs { - queue_base_url: get_env_var_or_panic("MADARA_ORCHESTRATOR_SQS_BASE_QUEUE_URL"), + queue_base_url: Url::parse(&get_env_var_or_panic("MADARA_ORCHESTRATOR_SQS_BASE_QUEUE_URL")) + .expect("Invalid queue base URL"), sqs_prefix: get_env_var_or_panic("MADARA_ORCHESTRATOR_SQS_PREFIX"), sqs_suffix: get_env_var_or_panic("MADARA_ORCHESTRATOR_SQS_SUFFIX"), }); @@ -516,8 +519,14 @@ fn get_env_params() -> EnvParams { ethereum_rpc_url: Url::parse(&get_env_var_or_panic("MADARA_ORCHESTRATOR_ETHEREUM_SETTLEMENT_RPC_URL")) .expect("Failed to parse MADARA_ORCHESTRATOR_ETHEREUM_RPC_URL"), ethereum_private_key: get_env_var_or_panic("MADARA_ORCHESTRATOR_ETHEREUM_PRIVATE_KEY"), - l1_core_contract_address: get_env_var_or_panic("MADARA_ORCHESTRATOR_L1_CORE_CONTRACT_ADDRESS"), - starknet_operator_address: get_env_var_or_panic("MADARA_ORCHESTRATOR_STARKNET_OPERATOR_ADDRESS"), + l1_core_contract_address: Address::from_str(&get_env_var_or_panic( + "MADARA_ORCHESTRATOR_L1_CORE_CONTRACT_ADDRESS", + )) + .expect("Invalid L1 core contract address"), + starknet_operator_address: Address::from_str(&get_env_var_or_panic( + "MADARA_ORCHESTRATOR_STARKNET_OPERATOR_ADDRESS", + )) + .expect("Invalid Starknet operator address"), }); let snos_config = SNOSParams { diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index d7900b21..d5364516 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -1,3 +1,4 @@ +use std::cmp::{max, min}; use std::collections::HashMap; use std::sync::Arc; @@ -23,7 +24,7 @@ impl Worker for SnosWorker { let block_number_provider = provider.block_number().await?; let latest_block_number = if let Some(max_block_to_process) = config.service_config().max_block_to_process { - max_block_to_process + min(max_block_to_process, block_number_provider) } else { block_number_provider }; @@ -33,15 +34,15 @@ impl Worker for SnosWorker { let latest_job_in_db = config.database().get_latest_job_by_type(JobType::SnosRun).await?; let latest_job_id = match latest_job_in_db { - Some(job) => job.internal_id, - None => "0".to_string(), + Some(job) => job.internal_id.parse::().unwrap(), + None => "0".to_string().parse::().unwrap(), }; // To be used when testing in specific block range let block_start = if let Some(min_block_to_process) = config.service_config().min_block_to_process { - min_block_to_process + max(min_block_to_process, latest_job_id) } else { - latest_job_id.parse::().unwrap() + latest_job_id }; for block_num in block_start..latest_block_number + 1 { diff --git a/crates/settlement-clients/ethereum/Cargo.toml b/crates/settlement-clients/ethereum/Cargo.toml index a458bb1d..987672f7 100644 --- a/crates/settlement-clients/ethereum/Cargo.toml +++ b/crates/settlement-clients/ethereum/Cargo.toml @@ -5,7 +5,6 @@ edition.workspace = true [dependencies] alloy-primitives = { version = "0.7.7", default-features = false } - alloy = { workspace = true, features = ["full", "node-bindings"] } async-trait = { workspace = true } bytes = "1.7.2" diff --git a/crates/settlement-clients/ethereum/src/lib.rs b/crates/settlement-clients/ethereum/src/lib.rs index 4c32da4c..55cc3fe9 100644 --- a/crates/settlement-clients/ethereum/src/lib.rs +++ b/crates/settlement-clients/ethereum/src/lib.rs @@ -67,9 +67,9 @@ pub struct EthereumSettlementValidatedArgs { pub ethereum_private_key: String, - pub l1_core_contract_address: String, + pub l1_core_contract_address: Address, - pub starknet_operator_address: String, + pub starknet_operator_address: Address, } #[allow(dead_code)] @@ -99,13 +99,8 @@ impl EthereumSettlementClient { .on_http(settlement_cfg.ethereum_rpc_url.clone()), ); - let core_contract_client = StarknetValidityContractClient::new( - Address::from_str(&settlement_cfg.l1_core_contract_address) - .expect("Failed to convert the validity contract address.") - .0 - .into(), - filler_provider, - ); + let core_contract_client = + StarknetValidityContractClient::new(settlement_cfg.l1_core_contract_address, filler_provider); EthereumSettlementClient { provider, core_contract_client, wallet, wallet_address, impersonate_account: None } } diff --git a/crates/settlement-clients/starknet/Cargo.toml b/crates/settlement-clients/starknet/Cargo.toml index 861df735..42be56a9 100644 --- a/crates/settlement-clients/starknet/Cargo.toml +++ b/crates/settlement-clients/starknet/Cargo.toml @@ -4,6 +4,8 @@ version.workspace = true edition.workspace = true [dependencies] +alloy-primitives = { version = "0.7.7", default-features = false } +alloy = { workspace = true, features = ["full", "node-bindings"] } appchain-core-contract-client = { workspace = true } async-trait = { workspace = true } c-kzg = { workspace = true } diff --git a/crates/settlement-clients/starknet/src/lib.rs b/crates/settlement-clients/starknet/src/lib.rs index 2b2bd676..00d8245c 100644 --- a/crates/settlement-clients/starknet/src/lib.rs +++ b/crates/settlement-clients/starknet/src/lib.rs @@ -5,6 +5,7 @@ pub mod tests; use std::sync::Arc; +use alloy_primitives::Address; use appchain_core_contract_client::clients::StarknetCoreContractClient; use appchain_core_contract_client::interfaces::core_contract::CoreContract; use async_trait::async_trait; @@ -43,16 +44,10 @@ use url::Url; #[derive(Clone, Debug)] pub struct StarknetSettlementValidatedArgs { pub starknet_rpc_url: Url, - pub starknet_private_key: String, - - pub starknet_account_address: String, - - pub starknet_cairo_core_contract_address: String, - + pub starknet_account_address: Address, + pub starknet_cairo_core_contract_address: Address, pub starknet_finality_retry_wait_in_secs: u64, - - pub madara_binary_path: String, } // Assumed the contract called for settlement looks like: @@ -63,7 +58,7 @@ impl StarknetSettlementClient { let provider: Arc> = Arc::new(JsonRpcClient::new(HttpTransport::new(settlement_cfg.starknet_rpc_url.clone()))); - let public_key = settlement_cfg.starknet_account_address.clone(); + let public_key = settlement_cfg.starknet_account_address.clone().to_string(); let signer_address = Felt::from_hex(&public_key).expect("invalid signer address"); // TODO: Very insecure way of building the signer. Needs to be adjusted. @@ -71,7 +66,7 @@ impl StarknetSettlementClient { let signer = Felt::from_hex(&private_key).expect("Invalid private key"); let signer = LocalWallet::from(SigningKey::from_secret_scalar(signer)); - let core_contract_address = Felt::from_hex(&settlement_cfg.starknet_cairo_core_contract_address) + let core_contract_address = Felt::from_hex(&settlement_cfg.starknet_cairo_core_contract_address.to_string()) .expect("Invalid core contract address"); let account: Arc>, LocalWallet>> = diff --git a/crates/settlement-clients/starknet/src/tests/test.rs b/crates/settlement-clients/starknet/src/tests/test.rs index 2bbd0a59..cd0a6c85 100644 --- a/crates/settlement-clients/starknet/src/tests/test.rs +++ b/crates/settlement-clients/starknet/src/tests/test.rs @@ -1,8 +1,10 @@ use std::env; use std::path::Path; +use std::str::FromStr as _; use std::sync::Arc; use std::time::Duration; +use alloy_primitives::Address; use color_eyre::eyre::eyre; use rstest::{fixture, rstest}; use settlement_client_interface::SettlementClient; @@ -82,16 +84,19 @@ async fn setup(#[future] spin_up_madara: MadaraCmd) -> (LocalWalletSignerMiddlew let starknet_settlement_params: StarknetSettlementValidatedArgs = StarknetSettlementValidatedArgs { starknet_rpc_url: Url::parse(madara_process.rpc_url.as_ref()).unwrap(), starknet_private_key: get_env_var_or_panic("MADARA_ORCHESTRATOR_STARKNET_PRIVATE_KEY"), - starknet_account_address: get_env_var_or_panic("MADARA_ORCHESTRATOR_STARKNET_ACCOUNT_ADDRESS"), - starknet_cairo_core_contract_address: get_env_var_or_panic( + starknet_account_address: Address::from_str(&get_env_var_or_panic( + "MADARA_ORCHESTRATOR_STARKNET_ACCOUNT_ADDRESS", + )) + .unwrap(), + starknet_cairo_core_contract_address: Address::from_str(&get_env_var_or_panic( "MADARA_ORCHESTRATOR_STARKNET_CAIRO_CORE_CONTRACT_ADDRESS", - ), + )) + .unwrap(), starknet_finality_retry_wait_in_secs: get_env_var_or_panic( "MADARA_ORCHESTRATOR_STARKNET_FINALITY_RETRY_WAIT_IN_SECS", ) .parse::() .unwrap(), - madara_binary_path: get_env_var_or_panic("MADARA_ORCHESTRATOR_MADARA_BINARY_PATH"), }; let rpc_url = Url::parse(starknet_settlement_params.starknet_rpc_url.as_ref()).unwrap(); @@ -100,7 +105,7 @@ async fn setup(#[future] spin_up_madara: MadaraCmd) -> (LocalWalletSignerMiddlew let signer = LocalWallet::from(SigningKey::from_secret_scalar( Felt::from_hex(&starknet_settlement_params.starknet_private_key).expect("Invalid private key"), )); - let address = Felt::from_hex(&starknet_settlement_params.starknet_account_address).unwrap(); + let address = Felt::from_hex(&starknet_settlement_params.starknet_account_address.to_string()).unwrap(); let chain_id = provider.chain_id().await.unwrap(); let mut account = SingleOwnerAccount::new(provider, signer, address, chain_id, ExecutionEncoding::New); @@ -121,16 +126,19 @@ async fn test_settle(#[future] setup: (LocalWalletSignerMiddleware, MadaraCmd)) let mut starknet_settlement_params: StarknetSettlementValidatedArgs = StarknetSettlementValidatedArgs { starknet_rpc_url: madara_process.rpc_url.clone(), starknet_private_key: get_env_var_or_panic("MADARA_ORCHESTRATOR_STARKNET_PRIVATE_KEY"), - starknet_account_address: get_env_var_or_panic("MADARA_ORCHESTRATOR_STARKNET_ACCOUNT_ADDRESS"), - starknet_cairo_core_contract_address: get_env_var_or_panic( + starknet_account_address: Address::from_str(&get_env_var_or_panic( + "MADARA_ORCHESTRATOR_STARKNET_ACCOUNT_ADDRESS", + )) + .unwrap(), + starknet_cairo_core_contract_address: Address::from_str(&get_env_var_or_panic( "MADARA_ORCHESTRATOR_STARKNET_CAIRO_CORE_CONTRACT_ADDRESS", - ), + )) + .unwrap(), starknet_finality_retry_wait_in_secs: get_env_var_or_panic( "MADARA_ORCHESTRATOR_STARKNET_FINALITY_RETRY_WAIT_IN_SECS", ) .parse::() .unwrap(), - madara_binary_path: get_env_var_or_panic("MADARA_ORCHESTRATOR_MADARA_BINARY_PATH"), }; let project_root = Path::new(env!("CARGO_MANIFEST_DIR")).ancestors().nth(3).unwrap(); @@ -162,7 +170,8 @@ async fn test_settle(#[future] setup: (LocalWalletSignerMiddleware, MadaraCmd)) let deployed_address = deploy_v1.deployed_address(); // env::set_var("STARKNET_CAIRO_CORE_CONTRACT_ADDRESS", deployed_address.to_hex_string()); - starknet_settlement_params.starknet_cairo_core_contract_address = deployed_address.to_hex_string(); + starknet_settlement_params.starknet_cairo_core_contract_address = + Address::from_str(&deployed_address.to_hex_string()).unwrap(); let InvokeTransactionResult { transaction_hash: deploy_tx_hash } = deploy_v1.send().await.expect("Unable to deploy contract"); diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index 537df164..ab351dee 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -6,7 +6,6 @@ edition.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -clap = { version = "4.4", features = ["derive", "env"] } color-eyre = { workspace = true } serde.workspace = true serde_json = { workspace = true } diff --git a/e2e-tests/Cargo.toml b/e2e-tests/Cargo.toml index dfb042d6..547301a1 100644 --- a/e2e-tests/Cargo.toml +++ b/e2e-tests/Cargo.toml @@ -26,8 +26,8 @@ rstest.workspace = true serde.workspace = true serde_json.workspace = true starknet.workspace = true -strum = "0.26.0" -strum_macros = "0.26.0" +strum = { workspace = true } +strum_macros = { workspace = true } testcontainers.workspace = true tokio = { workspace = true, features = ["full"] } tokio-stream.workspace = true diff --git a/e2e-tests/src/mongodb.rs b/e2e-tests/src/mongodb.rs index cc9a28a0..3f29c1cd 100644 --- a/e2e-tests/src/mongodb.rs +++ b/e2e-tests/src/mongodb.rs @@ -1,5 +1,3 @@ -use std::str::FromStr; - use orchestrator::database::mongodb::MongoDBValidatedArgs; use url::Url; #[allow(dead_code)] @@ -9,7 +7,7 @@ pub struct MongoDbServer { impl MongoDbServer { pub fn run(mongodb_params: MongoDBValidatedArgs) -> Self { - Self { endpoint: Url::from_str(&mongodb_params.connection_url).unwrap() } + Self { endpoint: mongodb_params.connection_url } } pub fn endpoint(&self) -> Url { diff --git a/e2e-tests/src/node.rs b/e2e-tests/src/node.rs index 030e3a09..bef74615 100644 --- a/e2e-tests/src/node.rs +++ b/e2e-tests/src/node.rs @@ -36,15 +36,22 @@ pub enum OrchestratorMode { } impl Orchestrator { - pub fn setup(envs: Vec<(String, String)>) { - let port = get_free_port(); + pub fn new(mode: OrchestratorMode, mut envs: Vec<(String, String)>) -> Option { let repository_root = &get_repository_root(); - let port_str = format!("{}", port); - std::env::set_current_dir(repository_root).expect("Failed to change working directory"); - let envs = [envs, vec![("MADARA_ORCHESTRATOR_PORT".to_string(), port_str)]].concat(); - println!("Running orchestrator in Setup mode"); + let (mode_str, is_run_mode) = match mode { + OrchestratorMode::Setup => { + println!("Running orchestrator in Setup mode"); + (OrchestratorMode::Setup.to_string(), false) + } + OrchestratorMode::Run => { + println!("Running orchestrator in Run mode"); + (OrchestratorMode::Run.to_string(), true) + } + }; + + // Configure common command arguments let mut command = Command::new("cargo"); command .arg("run") @@ -53,100 +60,71 @@ impl Orchestrator { .arg("orchestrator") .arg("--features") .arg("testing") + .arg(mode_str) .arg("--") - // if mode::Run then --run else if mode::Setup then --setup - .arg("--setup") + .arg("--aws") .arg("--settle-on-ethereum") .arg("--aws-s3") .arg("--aws-sqs") .arg("--aws-sns") .arg("--mongodb") .arg("--sharp") - .arg("--da-on-ethereum") - .arg("--aws-event-bridge") - .current_dir(repository_root) - .envs(envs) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - let mut process = command.spawn().expect("Failed to start process"); + .arg("--da-on-ethereum"); - // Wait for the process to complete and get its exit status - let status = process.wait().expect("Failed to wait for process"); - - // You can check if the process succeeded - if status.success() { - println!("Setup Orchestrator completed successfully"); - } else { - // Get the exit code if available - if let Some(code) = status.code() { - println!("Setup Orchestrator failed with exit code: {}", code); - } else { - println!("Setup Orchestrator terminated by signal"); - } + // Add event bridge arg only for setup mode + if !is_run_mode { + command.arg("--aws-event-bridge"); } - } - - pub fn run(envs: Vec<(String, String)>) -> Self { - let port = get_free_port(); - let address = format!("127.0.0.1:{}", port); - let repository_root = &get_repository_root(); - - std::env::set_current_dir(repository_root).expect("Failed to change working directory"); - let port_str = format!("{}", port); - let envs = [envs, vec![("MADARA_ORCHESTRATOR_PORT".to_string(), port_str)]].concat(); + // Configure run-specific settings + let address = if is_run_mode { + let port = get_free_port(); + let addr = format!("127.0.0.1:{}", port); + envs.push(("MADARA_ORCHESTRATOR_PORT".to_string(), port.to_string())); + addr + } else { + String::new() + }; - println!("Running orchestrator in Run mode"); - let mut command = Command::new("cargo"); - command - .arg("run") - .arg("--release") - .arg("--bin") - .arg("orchestrator") - .arg("--features") - .arg("testing") - .arg("--") - // if mode::Run then --run else if mode::Setup then --setup - .arg("--run") - .arg("--settle-on-ethereum") - .arg("--aws-s3") - .arg("--aws-sqs") - .arg("--aws-sns") - .arg("--mongodb") - .arg("--sharp") - .arg("--da-on-ethereum") - .arg("--aws-event-bridge") - .current_dir(repository_root) - .envs(envs) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); + command.current_dir(repository_root).envs(envs).stdout(Stdio::piped()).stderr(Stdio::piped()); let mut process = command.spawn().expect("Failed to start process"); - // Capture and print stdout - let stdout = process.stdout.take().expect("Failed to capture stdout"); - thread::spawn(move || { - let reader = BufReader::new(stdout); - reader.lines().for_each(|line| { - if let Ok(line) = line { - println!("STDOUT: {}", line); - } + if is_run_mode { + // Set up stdout and stderr handling for run mode + let stdout = process.stdout.take().expect("Failed to capture stdout"); + thread::spawn(move || { + let reader = BufReader::new(stdout); + reader.lines().for_each(|line| { + if let Ok(line) = line { + println!("STDOUT: {}", line); + } + }); }); - }); - - // Capture and print stderr - let stderr = process.stderr.take().expect("Failed to capture stderr"); - thread::spawn(move || { - let reader = BufReader::new(stderr); - reader.lines().for_each(|line| { - if let Ok(line) = line { - eprintln!("STDERR: {}", line); - } + + let stderr = process.stderr.take().expect("Failed to capture stderr"); + thread::spawn(move || { + let reader = BufReader::new(stderr); + reader.lines().for_each(|line| { + if let Ok(line) = line { + eprintln!("STDERR: {}", line); + } + }); }); - }); - Self { process, address } + Some(Self { process, address }) + } else { + // Handle setup mode + let status = process.wait().expect("Failed to wait for process"); + if status.success() { + println!("Setup Orchestrator completed successfully"); + } else if let Some(code) = status.code() { + println!("Setup Orchestrator failed with exit code: {}", code); + } else { + println!("Setup Orchestrator terminated by signal"); + } + None + } } pub fn endpoint(&self) -> Url { diff --git a/e2e-tests/tests.rs b/e2e-tests/tests.rs index 097ab312..2cd91a59 100644 --- a/e2e-tests/tests.rs +++ b/e2e-tests/tests.rs @@ -24,6 +24,7 @@ use rstest::rstest; use serde::{Deserialize, Serialize}; use serde_json::json; use starknet::core::types::{Felt, MaybePendingStateUpdate}; +use url::Url; use utils::env_utils::get_env_var_or_panic; use uuid::Uuid; @@ -51,7 +52,8 @@ impl Setup { /// Initialise a new setup pub async fn new(l2_block_number: String) -> Self { let db_params = DatabaseValidatedArgs::MongoDB(MongoDBValidatedArgs { - connection_url: get_env_var_or_panic("MADARA_ORCHESTRATOR_MONGODB_CONNECTION_URL"), + connection_url: Url::parse(&get_env_var_or_panic("MADARA_ORCHESTRATOR_MONGODB_CONNECTION_URL")) + .expect("Invalid MongoDB connection URL"), database_name: get_env_var_or_panic("MADARA_ORCHESTRATOR_DATABASE_NAME"), }); @@ -126,10 +128,13 @@ impl Setup { async fn test_orchestrator_workflow(#[case] l2_block_number: String) { // Fetching the env vars from the test env file as these will be used in // setting up of the test and during orchestrator run too. + + use e2e_tests::node::OrchestratorMode; dotenvy::from_filename(".env.test").expect("Failed to load the .env file"); let queue_params = AWSSQSValidatedArgs { - queue_base_url: get_env_var_or_panic("MADARA_ORCHESTRATOR_SQS_BASE_QUEUE_URL"), + queue_base_url: Url::parse(&get_env_var_or_panic("MADARA_ORCHESTRATOR_SQS_BASE_QUEUE_URL")) + .expect("Invalid queue base URL"), sqs_prefix: get_env_var_or_panic("MADARA_ORCHESTRATOR_SQS_PREFIX"), sqs_suffix: get_env_var_or_panic("MADARA_ORCHESTRATOR_SQS_SUFFIX"), }; @@ -137,7 +142,7 @@ async fn test_orchestrator_workflow(#[case] l2_block_number: String) { let mut setup_config = Setup::new(l2_block_number.clone()).await; // Setup Cloud // Setup orchestrator cloud - Orchestrator::setup(setup_config.envs()); + Orchestrator::new(OrchestratorMode::Setup, setup_config.envs()); println!("✅ Orchestrator cloud setup completed"); // Step 1 : SNOS job runs ========================================= @@ -160,7 +165,8 @@ async fn test_orchestrator_workflow(#[case] l2_block_number: String) { println!("✅ Orchestrator setup completed."); // Run orchestrator - let mut orchestrator = Orchestrator::run(setup_config.envs()); + let mut orchestrator = + Orchestrator::new(OrchestratorMode::Run, setup_config.envs()).expect("Failed to start orchestrator"); orchestrator.wait_till_started().await; println!("✅ Orchestrator started");