From 2b1d505ca26f2c9fb2802251d3442a5ed237585b Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Tue, 29 Oct 2024 12:36:48 -0300 Subject: [PATCH 01/14] intial window implementation --- .../services/bls_aggregation/src/bls_agg.rs | 279 +++++++++++------- 1 file changed, 170 insertions(+), 109 deletions(-) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 80147481..9d781da7 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -1,4 +1,4 @@ -use alloy_primitives::{FixedBytes, U256}; +use alloy_primitives::{FixedBytes, Uint, U256}; use ark_bn254::G2Affine; use ark_ec::AffineRepr; use eigen_crypto_bls::{BlsG1Point, BlsG2Point, Signature}; @@ -18,7 +18,7 @@ use tokio::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, Mutex, }, - time::{timeout, Duration}, + time::Duration, }; /// The response from the BLS aggregation service @@ -48,6 +48,8 @@ pub enum BlsAggregationServiceError { SignatureVerificationError(SignatureVerificationError), #[error("channel was closed")] ChannelClosed, + #[error("signatures channel was closed")] + SignatureChannelClosed, #[error("error sending to channel")] ChannelError, #[error("Avs Registry Error")] @@ -275,7 +277,7 @@ impl BlsAggregatorService aggregated_response_sender: UnboundedSender< Result, >, - mut rx: UnboundedReceiver, + signatures_rx: UnboundedReceiver, ) -> Result<(), BlsAggregationServiceError> { let quorum_threshold_percentage_map: HashMap = quorum_nums .iter() @@ -303,117 +305,176 @@ impl BlsAggregatorService .map(|avs_state| avs_state.agg_pub_key_g1.clone()) .collect(); - let mut aggregated_operators: HashMap, AggregatedOperators> = HashMap::new(); - - // iterate over the signed task responses receive from the channel, until the time to expiry is reached or the channel is closed - while let Some(signed_task_digest) = timeout(time_to_expiry, rx.recv()) - .await - .inspect_err(|_err| { - // timeout - println!("expire"); - let _ = - aggregated_response_sender.send(Err(BlsAggregationServiceError::TaskExpired)); - }) - .map_err(|_| BlsAggregationServiceError::TaskExpired)? - { - // check if the operator has already signed for this digest - if aggregated_operators - .get(&signed_task_digest.task_response_digest) - .map(|operators| { - operators - .signers_operator_ids_set - .contains_key(&signed_task_digest.operator_id) - }) - .unwrap_or(false) - { - signed_task_digest - .signature_verification_channel - .send(Err(SignatureVerificationError::DuplicateSignature)) - .await - .map_err(|_| BlsAggregationServiceError::ChannelError)?; - continue; - } - - let verification_result = BlsAggregatorService::::verify_signature( - task_index, - &signed_task_digest, - &operator_state_avs, - ) - .await; - let verification_failed = verification_result.is_err(); - - signed_task_digest - .signature_verification_channel - .send(verification_result) - .await - .map_err(|_| BlsAggregationServiceError::ChannelError)?; - - if verification_failed { - continue; - } + Self::loop_task_aggregator( + avs_registry_service, + task_index, + task_created_block, + time_to_expiry, + aggregated_response_sender, + signatures_rx, + operator_state_avs, + total_stake_per_quorum, + quorum_threshold_percentage_map, + quorum_apks_g1, + quorum_nums, + ) + .await + } - let operator_state = operator_state_avs - .get(&signed_task_digest.operator_id) - .unwrap(); - - let operator_g2_pubkey = operator_state - .operator_info - .pub_keys - .clone() - .unwrap() - .g2_pub_key - .g2(); - - let digest_aggregated_operators = aggregated_operators - .get_mut(&signed_task_digest.task_response_digest) - .map(|digest_aggregated_operators| { - BlsAggregatorService::::aggregate_new_operator( + #[allow(clippy::too_many_arguments)] + async fn loop_task_aggregator( + avs_registry_service: A, + task_index: TaskIndex, + task_created_block: u32, + time_to_expiry: Duration, + aggregated_response_sender: UnboundedSender< + Result, + >, + mut signatures_rx: UnboundedReceiver, + operator_state_avs: HashMap, OperatorAvsState>, + total_stake_per_quorum: HashMap>, + quorum_threshold_percentage_map: HashMap, + quorum_apks_g1: Vec, + quorum_nums: Vec, + ) -> Result<(), BlsAggregationServiceError> { + let mut aggregated_operators: HashMap, AggregatedOperators> = HashMap::new(); + let mut open_window = false; + let mut current_aggregated_response: Option = None; + let (window_tx, mut window_rx) = tokio::sync::mpsc::unbounded_channel::(); + let task_expired_timer = tokio::time::sleep(time_to_expiry); + tokio::pin!(task_expired_timer); + + loop { + tokio::select! { + _ = &mut task_expired_timer => { + // Task expired. If window is open, send aggregated reponse. Else, send error + if open_window { + aggregated_response_sender + .send(Ok(current_aggregated_response.unwrap())) + .map_err(|_| BlsAggregationServiceError::ChannelError)?; + } else { + let _ = aggregated_response_sender.send(Err(BlsAggregationServiceError::TaskExpired)); + } + return Ok(()); + }, + _ = window_rx.recv() => { + // Window finished. Send aggregated response + aggregated_response_sender + .send(Ok(current_aggregated_response.unwrap())) + .map_err(|_| BlsAggregationServiceError::ChannelError)?; + return Ok(()); + }, + signed_task_digest = signatures_rx.recv() =>{ + // New signature, aggregate it. If threshold is met, start window + + let Some(digest) = signed_task_digest else { + return Err(BlsAggregationServiceError::SignatureChannelClosed); + }; + // check if the operator has already signed for this digest + if aggregated_operators + .get(&digest.task_response_digest) + .map(|operators| { + operators + .signers_operator_ids_set + .contains_key(&digest.operator_id) + }) + .unwrap_or(false) + { + digest + .signature_verification_channel + .send(Err(SignatureVerificationError::DuplicateSignature)) + .await + .map_err(|_| BlsAggregationServiceError::ChannelError)?; + continue; + } + + let verification_result = BlsAggregatorService::::verify_signature( + task_index, + &digest, + &operator_state_avs, + ) + .await; + let verification_failed = verification_result.is_err(); + + digest + .signature_verification_channel + .send(verification_result) + .await + .map_err(|_| BlsAggregationServiceError::ChannelError)?; + + if verification_failed { + continue; + } + + let operator_state = operator_state_avs + .get(&digest.operator_id) + .unwrap(); + + let operator_g2_pubkey = operator_state + .operator_info + .pub_keys + .clone() + .unwrap() + .g2_pub_key + .g2(); + + let digest_aggregated_operators = aggregated_operators + .get_mut(&digest.task_response_digest) + .map(|digest_aggregated_operators| { + BlsAggregatorService::::aggregate_new_operator( + digest_aggregated_operators, + operator_state.clone(), + digest.clone(), + ) + .clone() + }) + .unwrap_or(AggregatedOperators { + signers_apk_g2: BlsG2Point::new((G2Affine::zero() + operator_g2_pubkey).into()), + signers_agg_sig_g1: digest.bls_signature.clone(), + signers_operator_ids_set: HashMap::from([( + operator_state.operator_id.into(), + true, + )]), + signers_total_stake_per_quorum: operator_state.stake_per_quorum.clone(), + }); + + aggregated_operators.insert( + digest.task_response_digest, + digest_aggregated_operators.clone(), + ); + + if !BlsAggregatorService::::check_if_stake_thresholds_met( + &digest_aggregated_operators.signers_total_stake_per_quorum, + &total_stake_per_quorum, + &quorum_threshold_percentage_map, + ) { + continue; + } + + if !open_window { + open_window = true; + let sender_cloned = window_tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + let _ = sender_cloned.send(true); + }); + } + + current_aggregated_response = Some(BlsAggregatorService::build_aggregated_response( + task_index, + task_created_block, + digest, + &operator_state_avs, digest_aggregated_operators, - operator_state.clone(), - signed_task_digest.clone(), + &avs_registry_service, + &quorum_apks_g1, + &quorum_nums, ) - .clone() - }) - .unwrap_or(AggregatedOperators { - signers_apk_g2: BlsG2Point::new((G2Affine::zero() + operator_g2_pubkey).into()), - signers_agg_sig_g1: signed_task_digest.bls_signature.clone(), - signers_operator_ids_set: HashMap::from([( - operator_state.operator_id.into(), - true, - )]), - signers_total_stake_per_quorum: operator_state.stake_per_quorum.clone(), - }); - - aggregated_operators.insert( - signed_task_digest.task_response_digest, - digest_aggregated_operators.clone(), - ); - - if !BlsAggregatorService::::check_if_stake_thresholds_met( - &digest_aggregated_operators.signers_total_stake_per_quorum, - &total_stake_per_quorum, - &quorum_threshold_percentage_map, - ) { - continue; - } + .await?); - let bls_aggregation_service_response = BlsAggregatorService::build_aggregated_response( - task_index, - task_created_block, - signed_task_digest, - &operator_state_avs, - digest_aggregated_operators, - &avs_registry_service, - &quorum_apks_g1, - &quorum_nums, - ) - .await?; - - aggregated_response_sender - .send(Ok(bls_aggregation_service_response)) - .map_err(|_| BlsAggregationServiceError::ChannelError)?; + } + } } - Err(BlsAggregationServiceError::ChannelClosed) } /// Builds the aggregated response containing all the aggregation info. From 6ec6081fb84e6725a4239393252992a58a1ec484 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Tue, 29 Oct 2024 12:56:51 -0300 Subject: [PATCH 02/14] add window duration as parameter --- .../services/bls_aggregation/src/bls_agg.rs | 61 ++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 9d781da7..92d2a9b5 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -317,6 +317,64 @@ impl BlsAggregatorService quorum_threshold_percentage_map, quorum_apks_g1, quorum_nums, + Duration::from_secs(2), + ) + .await + } + + #[allow(clippy::too_many_arguments)] + pub async fn single_task_aggregator_with_window( + avs_registry_service: A, + task_index: TaskIndex, + task_created_block: u32, + quorum_nums: Vec, + quorum_threshold_percentages: QuorumThresholdPercentages, + time_to_expiry: Duration, + aggregated_response_sender: UnboundedSender< + Result, + >, + signatures_rx: UnboundedReceiver, + window_duration: Duration, + ) -> Result<(), BlsAggregationServiceError> { + let quorum_threshold_percentage_map: HashMap = quorum_nums + .iter() + .enumerate() + .map(|(i, quorum_number)| (*quorum_number, quorum_threshold_percentages[i])) + .collect(); + + let operator_state_avs = avs_registry_service + .get_operators_avs_state_at_block(task_created_block, &quorum_nums) + .await + .map_err(|_| BlsAggregationServiceError::RegistryError)?; + + let quorums_avs_state = avs_registry_service + .get_quorums_avs_state_at_block(&quorum_nums, task_created_block) + .await + .map_err(|_| BlsAggregationServiceError::RegistryError)?; + let total_stake_per_quorum: HashMap<_, _> = quorums_avs_state + .iter() + .map(|(k, v)| (*k, v.total_stake)) + .collect(); + + let quorum_apks_g1: Vec = quorum_nums + .iter() + .filter_map(|quorum_num| quorums_avs_state.get(quorum_num)) + .map(|avs_state| avs_state.agg_pub_key_g1.clone()) + .collect(); + + Self::loop_task_aggregator( + avs_registry_service, + task_index, + task_created_block, + time_to_expiry, + aggregated_response_sender, + signatures_rx, + operator_state_avs, + total_stake_per_quorum, + quorum_threshold_percentage_map, + quorum_apks_g1, + quorum_nums, + window_duration, ) .await } @@ -336,6 +394,7 @@ impl BlsAggregatorService quorum_threshold_percentage_map: HashMap, quorum_apks_g1: Vec, quorum_nums: Vec, + window_duration: Duration, ) -> Result<(), BlsAggregationServiceError> { let mut aggregated_operators: HashMap, AggregatedOperators> = HashMap::new(); let mut open_window = false; @@ -455,7 +514,7 @@ impl BlsAggregatorService open_window = true; let sender_cloned = window_tx.clone(); tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(window_duration).await; let _ = sender_cloned.send(true); }); } From fee57fe9d144a6a1530238ce809d567e97395836 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 29 Oct 2024 16:24:16 -0300 Subject: [PATCH 03/14] Removed duplicated code --- .../services/bls_aggregation/src/bls_agg.rs | 37 ++----------------- 1 file changed, 4 insertions(+), 33 deletions(-) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 92d2a9b5..40845a37 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -279,45 +279,16 @@ impl BlsAggregatorService >, signatures_rx: UnboundedReceiver, ) -> Result<(), BlsAggregationServiceError> { - let quorum_threshold_percentage_map: HashMap = quorum_nums - .iter() - .enumerate() - .map(|(i, quorum_number)| (*quorum_number, quorum_threshold_percentages[i])) - .collect(); - - let operator_state_avs = avs_registry_service - .get_operators_avs_state_at_block(task_created_block, &quorum_nums) - .await - .map_err(|_| BlsAggregationServiceError::RegistryError)?; - - let quorums_avs_state = avs_registry_service - .get_quorums_avs_state_at_block(&quorum_nums, task_created_block) - .await - .map_err(|_| BlsAggregationServiceError::RegistryError)?; - let total_stake_per_quorum: HashMap<_, _> = quorums_avs_state - .iter() - .map(|(k, v)| (*k, v.total_stake)) - .collect(); - - let quorum_apks_g1: Vec = quorum_nums - .iter() - .filter_map(|quorum_num| quorums_avs_state.get(quorum_num)) - .map(|avs_state| avs_state.agg_pub_key_g1.clone()) - .collect(); - - Self::loop_task_aggregator( + Self::single_task_aggregator_with_window( avs_registry_service, task_index, task_created_block, + quorum_nums, + quorum_threshold_percentages, time_to_expiry, aggregated_response_sender, signatures_rx, - operator_state_avs, - total_stake_per_quorum, - quorum_threshold_percentage_map, - quorum_apks_g1, - quorum_nums, - Duration::from_secs(2), + Duration::ZERO, ) .await } From 70fa7b21ad9a9a180bb065a6b1c31506896af1f9 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 30 Oct 2024 10:29:01 -0300 Subject: [PATCH 04/14] initialize_new_task_with_window function implemented --- .../services/bls_aggregation/src/bls_agg.rs | 78 +++++++++++-------- 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 40845a37..d4f9b560 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -123,14 +123,50 @@ impl BlsAggregatorService quorum_threshold_percentages: QuorumThresholdPercentages, time_to_expiry: Duration, ) -> Result<(), BlsAggregationServiceError> { - let mut task_channel = self.signed_task_response.write(); + self.initialize_new_task_with_window( + task_index, + task_created_block, + quorum_nums, + quorum_threshold_percentages, + time_to_expiry, + Duration::ZERO, + ) + .await + } - if task_channel.contains_key(&task_index) { - return Err(BlsAggregationServiceError::DuplicateTaskIndex); - } + /// Creates a new task meant to process new signed task responses for a task tokio channel. + /// + /// # Arguments + /// + /// * `task_index` - The index of the task + /// * `task_created_block` - The block number at which the task was created + /// * `quorum_nums` - The quorum numbers for the task + /// * `quorum_threshold_percentages` - The quorum threshold percentages for the task + /// * `time_to_expiry` - The timemetout for the task reader to expire + /// + /// # Error + /// + /// Returns error if the task index already exists + pub async fn initialize_new_task_with_window( + &self, + task_index: TaskIndex, + task_created_block: u32, + quorum_nums: Vec, + quorum_threshold_percentages: QuorumThresholdPercentages, + time_to_expiry: Duration, + window_duration: Duration, + ) -> Result<(), BlsAggregationServiceError> { + let signatures_rx = { + let mut task_channel = self.signed_task_response.write(); + + if task_channel.contains_key(&task_index) { + return Err(BlsAggregationServiceError::DuplicateTaskIndex); + } - let (tx, rx) = mpsc::unbounded_channel(); - task_channel.insert(task_index, tx); + let (signatures_tx, signatures_rx) = mpsc::unbounded_channel(); + task_channel.insert(task_index, signatures_tx); + signatures_rx + }; let avs_registry_service = self.avs_registry_service.clone(); let aggregated_response_sender = self.aggregated_response_sender.clone(); @@ -144,7 +180,8 @@ impl BlsAggregatorService quorum_threshold_percentages.clone(), time_to_expiry, aggregated_response_sender, - rx, + signatures_rx, + window_duration, ) .await .inspect_err(|err| { @@ -278,33 +315,6 @@ impl BlsAggregatorService Result, >, signatures_rx: UnboundedReceiver, - ) -> Result<(), BlsAggregationServiceError> { - Self::single_task_aggregator_with_window( - avs_registry_service, - task_index, - task_created_block, - quorum_nums, - quorum_threshold_percentages, - time_to_expiry, - aggregated_response_sender, - signatures_rx, - Duration::ZERO, - ) - .await - } - - #[allow(clippy::too_many_arguments)] - pub async fn single_task_aggregator_with_window( - avs_registry_service: A, - task_index: TaskIndex, - task_created_block: u32, - quorum_nums: Vec, - quorum_threshold_percentages: QuorumThresholdPercentages, - time_to_expiry: Duration, - aggregated_response_sender: UnboundedSender< - Result, - >, - signatures_rx: UnboundedReceiver, window_duration: Duration, ) -> Result<(), BlsAggregationServiceError> { let quorum_threshold_percentage_map: HashMap = quorum_nums From 6d9ced69166253535babb81ddfdf962031711d72 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 30 Oct 2024 15:23:30 -0300 Subject: [PATCH 05/14] test_1_quorum_2_operators_window draft --- .../bls_aggregation/src/bls_agg_test.rs | 201 ++++++++++++++++++ 1 file changed, 201 insertions(+) diff --git a/crates/services/bls_aggregation/src/bls_agg_test.rs b/crates/services/bls_aggregation/src/bls_agg_test.rs index 692f3ea3..7740b2ac 100644 --- a/crates/services/bls_aggregation/src/bls_agg_test.rs +++ b/crates/services/bls_aggregation/src/bls_agg_test.rs @@ -652,4 +652,205 @@ pub mod integration_test { .await .unwrap(); } + + // Test with time window: wait to get more signatures + #[tokio::test] + async fn test_1_quorum_2_operators_window() { + let (container, http_endpoint, ws_endpoint) = start_anvil_container().await; + + let registry_coordinator_address = + get_registry_coordinator_address(http_endpoint.clone()).await; + let operator_state_retriever_address = + get_operator_state_retriever_address(http_endpoint.clone()).await; + let service_manager_address = get_service_manager_address(http_endpoint.clone()).await; + let provider = get_provider(http_endpoint.as_str()); + let salt: FixedBytes<32> = FixedBytes::from([0x02; 32]); + + let bls_key_pair_1 = BlsKeyPair::new(BLS_KEY_1.to_string()).unwrap(); + let operator_id_1 = + hex!("fd329fe7e54f459b9c104064efe0172db113a50b5f394949b4ef80b3c34ca7f5").into(); + + let bls_key_pair_2 = BlsKeyPair::new(BLS_KEY_2.to_string()).unwrap(); + let operator_id_2 = + hex!("7213614953817d00866957a5f866c67a5fb8d4e392af501701f7ab35294dc4b3").into(); + + let quorum_nums = Bytes::from([1u8]); + let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100]; + + let contract_registry_coordinator = RegistryCoordinator::new( + registry_coordinator_address, + get_signer(PRIVATE_KEY_1, http_endpoint.as_str()), + ); + + // Create quorum + let operator_set_params = OperatorSetParam { + maxOperatorCount: 10, + kickBIPsOfOperatorStake: 100, + kickBIPsOfTotalStake: 1000, + }; + let strategy_params = vec![StrategyParams { + strategy: get_erc20_mock_strategy(http_endpoint.clone()).await, + multiplier: U96::from(1), + }]; + let _ = contract_registry_coordinator + .createQuorum( + operator_set_params.clone(), + U96::from(0), + strategy_params.clone(), + ) + .send() + .await + .unwrap(); + + // Create avs clients to interact with contracts deployed on anvil + let avs_registry_reader = AvsRegistryChainReader::new( + get_test_logger(), + registry_coordinator_address, + operator_state_retriever_address, + http_endpoint.clone(), + ) + .await + .unwrap(); + + let avs_writer = AvsRegistryChainWriter::build_avs_registry_chain_writer( + get_test_logger(), + http_endpoint.clone(), + PRIVATE_KEY_1.to_string(), + registry_coordinator_address, + operator_state_retriever_address, + ) + .await + .unwrap(); + let operators_info = OperatorInfoServiceInMemory::new( + get_test_logger(), + avs_registry_reader.clone(), + ws_endpoint, + ) + .await; + + let current_block_num = provider.get_block_number().await.unwrap(); + let cancellation_token = CancellationToken::new(); + let operators_info_clone = operators_info.clone(); + let token_clone = cancellation_token.clone(); + + task::spawn(async move { + operators_info_clone + .start_service(&token_clone, 0, current_block_num) + .await + }); + + // Register operator + avs_writer + .register_operator_in_quorum_with_avs_registry_coordinator( + bls_key_pair_1.clone(), + salt, + U256::from_be_slice(&[0xff; 32]), + quorum_nums.clone(), + "socket".to_string(), + ) + .await + .unwrap(); + + let avs_writer = AvsRegistryChainWriter::build_avs_registry_chain_writer( + get_test_logger(), + http_endpoint, + PRIVATE_KEY_2.to_string(), + registry_coordinator_address, + operator_state_retriever_address, + ) + .await + .unwrap(); + avs_writer + .register_operator_in_quorum_with_avs_registry_coordinator( + bls_key_pair_2.clone(), + salt, + U256::from_be_slice(&[0xff; 32]), + quorum_nums.clone(), + "socket".to_string(), + ) + .await + .unwrap(); + + // Sleep is needed so registered operators are accesible to the OperatorInfoServiceInMemory + sleep(Duration::from_secs(3)).await; + + // Create aggregation service + let avs_registry_service = + AvsRegistryServiceChainCaller::new(avs_registry_reader.clone(), operators_info); + + let bls_agg_service = BlsAggregatorService::new(avs_registry_service); + + let current_block_num = provider.get_block_number().await.unwrap(); + + mine_anvil_blocks(&container, 1).await; + + // Create the task related parameters + let task_index: TaskIndex = 0; + let time_to_expiry = Duration::from_secs(3); + + // Initialize the task + bls_agg_service + .initialize_new_task_with_window( + task_index, + current_block_num as u32, + quorum_nums.to_vec(), + quorum_threshold_percentages, + time_to_expiry, + Duration::from_secs(2), + ) + .await + .unwrap(); + + // Compute the signature and send it to the aggregation service + let task_response = 123; + let task_response_digest = hash(task_response); + + let bls_signature_1 = bls_key_pair_1.sign_message(task_response_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_digest, + bls_signature_1, + operator_id_1, + ) + .await + .unwrap(); + + let bls_signature_2 = bls_key_pair_2.sign_message(task_response_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_digest, + bls_signature_2, + operator_id_2, + ) + .await + .unwrap(); + + // Wait for the response from the aggregation service + let bls_agg_response = bls_agg_service + .aggregated_response_receiver + .lock() + .await + .recv() + .await + .unwrap() + .unwrap(); + + // Send the shutdown signal to the OperatorInfoServiceInMemory + cancellation_token.cancel(); + + // Check the response + let service_manager = IBLSSignatureChecker::new(service_manager_address, provider); + service_manager + .checkSignatures( + task_response_digest, + quorum_nums, + current_block_num as u32, + agg_response_to_non_signer_stakes_and_signature(bls_agg_response), + ) + .call() + .await + .unwrap(); + } } From 5d676de5219d49cffb6af2f9869f1d39575b6579 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Wed, 30 Oct 2024 16:40:08 -0300 Subject: [PATCH 06/14] add test signatures are processed during window --- .../services/bls_aggregation/src/bls_agg.rs | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index d4f9b560..75862de0 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -674,6 +674,7 @@ mod tests { use std::collections::HashMap; use std::time::Duration; use std::vec; + use tokio::time::Instant; const PRIVATE_KEY_1: &str = "13710126902690889134622698668747132666439281256983827313388062967626731803599"; @@ -2034,4 +2035,137 @@ mod tests { response.unwrap() ); } + + #[tokio::test] + async fn test_signatures_are_processed_during_window_after_quorum() { + let test_operator_1 = TestOperator { + operator_id: U256::from(1).into(), + stake_per_quorum: HashMap::from([(0u8, U256::from(100))]), + bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(), + }; + let test_operator_2 = TestOperator { + operator_id: U256::from(2).into(), + stake_per_quorum: HashMap::from([(0u8, U256::from(100))]), + bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(), + }; + let test_operator_3 = TestOperator { + operator_id: U256::from(3).into(), + stake_per_quorum: HashMap::from([(0u8, U256::from(100))]), + bls_keypair: BlsKeyPair::new(PRIVATE_KEY_3.into()).unwrap(), + }; + let test_operators = vec![ + test_operator_1.clone(), + test_operator_2.clone(), + test_operator_3.clone(), + ]; + let block_number = 1; + let task_index = 0; + let task_response = 123; + let quorum_numbers: Vec = vec![0]; + let quorum_threshold_percentages: QuorumThresholdPercentages = vec![67_u8]; + let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators); + let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service); + + let time_to_expiry = Duration::from_secs(5); + let window_duration = Duration::from_secs(1); + + let start = Instant::now(); + bls_agg_service + .initialize_new_task_with_window( + task_index, + block_number as u32, + quorum_numbers, + quorum_threshold_percentages, + time_to_expiry, + window_duration, + ) + .await + .unwrap(); + + let task_response_1_digest = hash(task_response); + let bls_sig_op_1 = test_operator_1 + .bls_keypair + .sign_message(task_response_1_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_1_digest, + bls_sig_op_1.clone(), + test_operator_1.operator_id, + ) + .await + .unwrap(); + + let task_response_2_digest = hash(task_response); + let bls_sig_op_2 = test_operator_2 + .bls_keypair + .sign_message(task_response_2_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_2_digest, + bls_sig_op_2.clone(), + test_operator_2.operator_id, + ) + .await + .unwrap(); + + // quorum reached here, window should be open receiving signatures for 1 second + + let task_response_3_digest = hash(task_response); + let bls_sig_op_3 = test_operator_3 + .bls_keypair + .sign_message(task_response_3_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_3_digest, + bls_sig_op_3.clone(), + test_operator_3.operator_id, + ) + .await + .unwrap(); + + let signers_apk_g2 = aggregate_g2_public_keys(&vec![ + test_operator_1.clone(), + test_operator_2.clone(), + test_operator_3.clone(), + ]); + let signers_agg_sig_g1 = + aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2, bls_sig_op_3]); + let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![ + test_operator_1, + test_operator_2, + test_operator_3, + ])]; + + let expected_agg_service_response = BlsAggregationServiceResponse { + task_index, + task_response_digest: task_response_3_digest, + non_signers_pub_keys_g1: vec![], + quorum_apks_g1, + signers_apk_g2, + signers_agg_sig_g1, + non_signer_quorum_bitmap_indices: vec![], + quorum_apk_indices: vec![], + total_stake_indices: vec![], + non_signer_stake_indices: vec![], + }; + + let response = bls_agg_service + .aggregated_response_receiver + .lock() + .await + .recv() + .await; + + let elapsed = start.elapsed(); + assert_eq!( + expected_agg_service_response, + response.clone().unwrap().unwrap() + ); + assert_eq!(task_index, response.unwrap().unwrap().task_index); + assert!(elapsed < time_to_expiry); + assert!(elapsed >= window_duration); + } } From db973c17a8b5666550671553ee67a0fda6314ef9 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Wed, 30 Oct 2024 17:02:10 -0300 Subject: [PATCH 07/14] add test expiration during window --- .../services/bls_aggregation/src/bls_agg.rs | 122 +++++++++++++++++- 1 file changed, 117 insertions(+), 5 deletions(-) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 75862de0..d8b67b37 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -387,6 +387,7 @@ impl BlsAggregatorService loop { tokio::select! { _ = &mut task_expired_timer => { + dbg!("task expired"); // Task expired. If window is open, send aggregated reponse. Else, send error if open_window { aggregated_response_sender @@ -397,8 +398,9 @@ impl BlsAggregatorService } return Ok(()); }, - _ = window_rx.recv() => { + window_finished = window_rx.recv() => { // Window finished. Send aggregated response + dbg!("Window rx received", window_finished); aggregated_response_sender .send(Ok(current_aggregated_response.unwrap())) .map_err(|_| BlsAggregationServiceError::ChannelError)?; @@ -406,7 +408,7 @@ impl BlsAggregatorService }, signed_task_digest = signatures_rx.recv() =>{ // New signature, aggregate it. If threshold is met, start window - + dbg!("received signature"); let Some(digest) = signed_task_digest else { return Err(BlsAggregationServiceError::SignatureChannelClosed); }; @@ -488,6 +490,7 @@ impl BlsAggregatorService &total_stake_per_quorum, &quorum_threshold_percentage_map, ) { + dbg!("threshold not met"); continue; } @@ -649,6 +652,8 @@ impl BlsAggregatorService ) else { return false; }; + dbg!("signed_stake_by_quorum", signed_stake_by_quorum); + dbg!("total_stake_by_quorum", total_stake_by_quorum); let signed_stake = signed_stake_by_quorum * U256::from(100); let threshold_stake = *total_stake_by_quorum * U256::from(*quorum_threshold_percentage); @@ -657,6 +662,7 @@ impl BlsAggregatorService return false; } } + dbg!("thresholds met"); true } } @@ -674,7 +680,7 @@ mod tests { use std::collections::HashMap; use std::time::Duration; use std::vec; - use tokio::time::Instant; + use tokio::time::{sleep, Instant}; const PRIVATE_KEY_1: &str = "13710126902690889134622698668747132666439281256983827313388062967626731803599"; @@ -2062,7 +2068,7 @@ mod tests { let task_index = 0; let task_response = 123; let quorum_numbers: Vec = vec![0]; - let quorum_threshold_percentages: QuorumThresholdPercentages = vec![67_u8]; + let quorum_threshold_percentages: QuorumThresholdPercentages = vec![50_u8]; let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators); let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service); @@ -2111,7 +2117,7 @@ mod tests { .unwrap(); // quorum reached here, window should be open receiving signatures for 1 second - + sleep(Duration::from_millis(500)).await; let task_response_3_digest = hash(task_response); let bls_sig_op_3 = test_operator_3 .bls_keypair @@ -2168,4 +2174,110 @@ mod tests { assert!(elapsed < time_to_expiry); assert!(elapsed >= window_duration); } + + #[tokio::test] + async fn test_if_quorum_has_been_reached_and_the_task_expires_during_window_the_response_is_sent( + ) { + let test_operator_1 = TestOperator { + operator_id: U256::from(1).into(), + stake_per_quorum: HashMap::from([(0u8, U256::from(100))]), + bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(), + }; + let test_operator_2 = TestOperator { + operator_id: U256::from(2).into(), + stake_per_quorum: HashMap::from([(0u8, U256::from(100))]), + bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(), + }; + let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()]; + let block_number = 1; + let task_index = 0; + let task_response = 123; + let quorum_numbers: Vec = vec![0]; + let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8]; + let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators); + let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service); + + let time_to_expiry = Duration::from_secs(2); + let window_duration = Duration::from_secs(10); + + let start = Instant::now(); + bls_agg_service + .initialize_new_task_with_window( + task_index, + block_number as u32, + quorum_numbers, + quorum_threshold_percentages, + time_to_expiry, + window_duration, + ) + .await + .unwrap(); + + let task_response_1_digest = hash(task_response); + let bls_sig_op_1 = test_operator_1 + .bls_keypair + .sign_message(task_response_1_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_1_digest, + bls_sig_op_1.clone(), + test_operator_1.operator_id, + ) + .await + .unwrap(); + + // quorum reached here, window should be open receiving signatures + + let task_response_2_digest = hash(task_response); + let bls_sig_op_2 = test_operator_2 + .bls_keypair + .sign_message(task_response_2_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_2_digest, + bls_sig_op_2.clone(), + test_operator_2.operator_id, + ) + .await + .unwrap(); + + let signers_apk_g2 = + aggregate_g2_public_keys(&vec![test_operator_1.clone(), test_operator_2.clone()]); + let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1, bls_sig_op_2]); + let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![ + test_operator_1, + test_operator_2, + ])]; + + let expected_agg_service_response = BlsAggregationServiceResponse { + task_index, + task_response_digest: task_response_2_digest, + non_signers_pub_keys_g1: vec![], + quorum_apks_g1, + signers_apk_g2, + signers_agg_sig_g1, + non_signer_quorum_bitmap_indices: vec![], + quorum_apk_indices: vec![], + total_stake_indices: vec![], + non_signer_stake_indices: vec![], + }; + + let response = bls_agg_service + .aggregated_response_receiver + .lock() + .await + .recv() + .await; + + let elapsed = start.elapsed(); + assert_eq!( + expected_agg_service_response, + response.clone().unwrap().unwrap() + ); + assert_eq!(task_index, response.unwrap().unwrap().task_index); + assert!(elapsed >= time_to_expiry); + assert!(elapsed < window_duration); + } } From 3875054f3a411096630d5c1cb99bda2fc401e67e Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Wed, 30 Oct 2024 17:17:16 -0300 Subject: [PATCH 08/14] add test no aggregation with window of duration zero --- .../services/bls_aggregation/src/bls_agg.rs | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index d8b67b37..66c51526 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -2280,4 +2280,111 @@ mod tests { assert!(elapsed >= time_to_expiry); assert!(elapsed < window_duration); } + + #[tokio::test] + async fn test_if_window_duration_is_zero_no_signatures_are_aggregated_after_reaching_quorum() { + let test_operator_1 = TestOperator { + operator_id: U256::from(1).into(), + stake_per_quorum: HashMap::from([(0u8, U256::from(100))]), + bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(), + }; + let test_operator_2 = TestOperator { + operator_id: U256::from(2).into(), + stake_per_quorum: HashMap::from([(0u8, U256::from(100))]), + bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(), + }; + let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()]; + let block_number = 1; + let task_index = 0; + let task_response = 123; + let quorum_numbers: Vec = vec![0]; + let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8]; + let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators); + let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service); + + let time_to_expiry = Duration::from_secs(2); + let window_duration = Duration::ZERO; + + let start = Instant::now(); + bls_agg_service + .initialize_new_task_with_window( + task_index, + block_number as u32, + quorum_numbers, + quorum_threshold_percentages, + time_to_expiry, + window_duration, + ) + .await + .unwrap(); + + let task_response_1_digest = hash(task_response); + let bls_sig_op_1 = test_operator_1 + .bls_keypair + .sign_message(task_response_1_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_1_digest, + bls_sig_op_1.clone(), + test_operator_1.operator_id, + ) + .await + .unwrap(); + + // quorum reached here but window duration is zero, so no more signatures should be aggregated + sleep(Duration::from_millis(1)).await; + + let task_response_2_digest = hash(task_response); + let bls_sig_op_2 = test_operator_2 + .bls_keypair + .sign_message(task_response_2_digest.as_ref()); + let process_signature_result = bls_agg_service + .process_new_signature( + task_index, + task_response_2_digest, + bls_sig_op_2.clone(), + test_operator_2.operator_id, + ) + .await; + assert_eq!( + Err(BlsAggregationServiceError::ChannelError), // TODO: change this error to be more representative + process_signature_result + ); + + let signers_apk_g2 = aggregate_g2_public_keys(&vec![test_operator_1.clone()]); + let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]); + let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![ + test_operator_1, + test_operator_2.clone(), + ])]; + + let expected_agg_service_response = BlsAggregationServiceResponse { + task_index, + task_response_digest: task_response_1_digest, + non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()], + quorum_apks_g1, + signers_apk_g2, + signers_agg_sig_g1, + non_signer_quorum_bitmap_indices: vec![], + quorum_apk_indices: vec![], + total_stake_indices: vec![], + non_signer_stake_indices: vec![], + }; + + let response = bls_agg_service + .aggregated_response_receiver + .lock() + .await + .recv() + .await; + + let elapsed = start.elapsed(); + assert_eq!( + expected_agg_service_response, + response.clone().unwrap().unwrap() + ); + assert_eq!(task_index, response.unwrap().unwrap().task_index); + assert!(elapsed < time_to_expiry); + } } From 425d05a0c7898098cfe6810f6a1093931aeed206 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Wed, 30 Oct 2024 17:23:06 -0300 Subject: [PATCH 09/14] add test no aggregation after window --- .../services/bls_aggregation/src/bls_agg.rs | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 66c51526..5fa0d8e9 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -2387,4 +2387,111 @@ mod tests { assert_eq!(task_index, response.unwrap().unwrap().task_index); assert!(elapsed < time_to_expiry); } + + #[tokio::test] + async fn test_no_signatures_are_aggregated_after_window() { + let test_operator_1 = TestOperator { + operator_id: U256::from(1).into(), + stake_per_quorum: HashMap::from([(0u8, U256::from(100))]), + bls_keypair: BlsKeyPair::new(PRIVATE_KEY_1.into()).unwrap(), + }; + let test_operator_2 = TestOperator { + operator_id: U256::from(2).into(), + stake_per_quorum: HashMap::from([(0u8, U256::from(100))]), + bls_keypair: BlsKeyPair::new(PRIVATE_KEY_2.into()).unwrap(), + }; + let test_operators = vec![test_operator_1.clone(), test_operator_2.clone()]; + let block_number = 1; + let task_index = 0; + let task_response = 123; + let quorum_numbers: Vec = vec![0]; + let quorum_threshold_percentages: QuorumThresholdPercentages = vec![40_u8]; + let fake_avs_registry_service = FakeAvsRegistryService::new(block_number, test_operators); + let bls_agg_service = BlsAggregatorService::new(fake_avs_registry_service); + + let time_to_expiry = Duration::from_secs(5); + let window_duration = Duration::from_secs(1); + + let start = Instant::now(); + bls_agg_service + .initialize_new_task_with_window( + task_index, + block_number as u32, + quorum_numbers, + quorum_threshold_percentages, + time_to_expiry, + window_duration, + ) + .await + .unwrap(); + + let task_response_1_digest = hash(task_response); + let bls_sig_op_1 = test_operator_1 + .bls_keypair + .sign_message(task_response_1_digest.as_ref()); + bls_agg_service + .process_new_signature( + task_index, + task_response_1_digest, + bls_sig_op_1.clone(), + test_operator_1.operator_id, + ) + .await + .unwrap(); + + // quorum reached here, window should be open for 1 second + sleep(Duration::from_secs(2)).await; + + let task_response_2_digest = hash(task_response); + let bls_sig_op_2 = test_operator_2 + .bls_keypair + .sign_message(task_response_2_digest.as_ref()); + let process_signature_result = bls_agg_service + .process_new_signature( + task_index, + task_response_2_digest, + bls_sig_op_2.clone(), + test_operator_2.operator_id, + ) + .await; + assert_eq!( + Err(BlsAggregationServiceError::ChannelError), // TODO: change this error to be more representative + process_signature_result + ); + + let signers_apk_g2 = aggregate_g2_public_keys(&vec![test_operator_1.clone()]); + let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]); + let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![ + test_operator_1, + test_operator_2.clone(), + ])]; + + let expected_agg_service_response = BlsAggregationServiceResponse { + task_index, + task_response_digest: task_response_1_digest, + non_signers_pub_keys_g1: vec![test_operator_2.bls_keypair.public_key()], + quorum_apks_g1, + signers_apk_g2, + signers_agg_sig_g1, + non_signer_quorum_bitmap_indices: vec![], + quorum_apk_indices: vec![], + total_stake_indices: vec![], + non_signer_stake_indices: vec![], + }; + + let response = bls_agg_service + .aggregated_response_receiver + .lock() + .await + .recv() + .await; + + let elapsed = start.elapsed(); + assert_eq!( + expected_agg_service_response, + response.clone().unwrap().unwrap() + ); + assert_eq!(task_index, response.unwrap().unwrap().task_index); + assert!(elapsed < time_to_expiry); + } } From 0f2c3352b667050a3d642dcdc57e6b6e0797dfb4 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Wed, 30 Oct 2024 17:23:31 -0300 Subject: [PATCH 10/14] remove dbg --- crates/services/bls_aggregation/src/bls_agg.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 5fa0d8e9..5c1f1521 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -387,7 +387,6 @@ impl BlsAggregatorService loop { tokio::select! { _ = &mut task_expired_timer => { - dbg!("task expired"); // Task expired. If window is open, send aggregated reponse. Else, send error if open_window { aggregated_response_sender @@ -400,7 +399,6 @@ impl BlsAggregatorService }, window_finished = window_rx.recv() => { // Window finished. Send aggregated response - dbg!("Window rx received", window_finished); aggregated_response_sender .send(Ok(current_aggregated_response.unwrap())) .map_err(|_| BlsAggregationServiceError::ChannelError)?; @@ -408,7 +406,6 @@ impl BlsAggregatorService }, signed_task_digest = signatures_rx.recv() =>{ // New signature, aggregate it. If threshold is met, start window - dbg!("received signature"); let Some(digest) = signed_task_digest else { return Err(BlsAggregationServiceError::SignatureChannelClosed); }; @@ -490,7 +487,6 @@ impl BlsAggregatorService &total_stake_per_quorum, &quorum_threshold_percentage_map, ) { - dbg!("threshold not met"); continue; } @@ -652,8 +648,6 @@ impl BlsAggregatorService ) else { return false; }; - dbg!("signed_stake_by_quorum", signed_stake_by_quorum); - dbg!("total_stake_by_quorum", total_stake_by_quorum); let signed_stake = signed_stake_by_quorum * U256::from(100); let threshold_stake = *total_stake_by_quorum * U256::from(*quorum_threshold_percentage); @@ -662,7 +656,6 @@ impl BlsAggregatorService return false; } } - dbg!("thresholds met"); true } } From ee70f1d68f96bcf0e317bda134414a1a28acab32 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Wed, 30 Oct 2024 17:25:43 -0300 Subject: [PATCH 11/14] clippy --- crates/services/bls_aggregation/src/bls_agg.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 5c1f1521..9f848c3c 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -2345,7 +2345,7 @@ mod tests { process_signature_result ); - let signers_apk_g2 = aggregate_g2_public_keys(&vec![test_operator_1.clone()]); + let signers_apk_g2 = aggregate_g2_public_keys(&[test_operator_1.clone()]); let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]); let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![ test_operator_1, @@ -2452,7 +2452,7 @@ mod tests { process_signature_result ); - let signers_apk_g2 = aggregate_g2_public_keys(&vec![test_operator_1.clone()]); + let signers_apk_g2 = aggregate_g2_public_keys(&[test_operator_1.clone()]); let signers_agg_sig_g1 = aggregate_g1_signatures(&[bls_sig_op_1]); let quorum_apks_g1 = vec![aggregate_g1_public_keys(&vec![ test_operator_1, From a0274e62154a6f3c5decfc48fe74942b8ffb3da4 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Wed, 30 Oct 2024 17:28:18 -0300 Subject: [PATCH 12/14] clippy --- crates/services/bls_aggregation/src/bls_agg.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 9f848c3c..70726c41 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -397,7 +397,7 @@ impl BlsAggregatorService } return Ok(()); }, - window_finished = window_rx.recv() => { + _ = window_rx.recv() => { // Window finished. Send aggregated response aggregated_response_sender .send(Ok(current_aggregated_response.unwrap())) From ce93b56da9aa5f9078c31de172601fd2fe0eccc6 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Wed, 30 Oct 2024 18:13:37 -0300 Subject: [PATCH 13/14] docs --- crates/services/bls_aggregation/src/bls_agg.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 70726c41..cfd08bd3 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -110,7 +110,7 @@ impl BlsAggregatorService /// * `task_created_block` - The block number at which the task was created /// * `quorum_nums` - The quorum numbers for the task /// * `quorum_threshold_percentages` - The quorum threshold percentages for the task - /// * `time_to_expiry` - The timemetout for the task reader to expire + /// * `time_to_expiry` - The timeout for the task reader to expire /// /// # Error /// @@ -142,7 +142,8 @@ impl BlsAggregatorService /// * `task_created_block` - The block number at which the task was created /// * `quorum_nums` - The quorum numbers for the task /// * `quorum_threshold_percentages` - The quorum threshold percentages for the task - /// * `time_to_expiry` - The timemetout for the task reader to expire + /// * `time_to_expiry` - The timeout for the task reader to expire + /// * `window_duration` - The duration of the window to wait for signatures after quorum is reached /// /// # Error /// From 19b4360f4ccb90083b38607aa92101a00d413868 Mon Sep 17 00:00:00 2001 From: tomasarrachea Date: Wed, 30 Oct 2024 18:26:41 -0300 Subject: [PATCH 14/14] remove draft test --- .../bls_aggregation/src/bls_agg_test.rs | 201 ------------------ 1 file changed, 201 deletions(-) diff --git a/crates/services/bls_aggregation/src/bls_agg_test.rs b/crates/services/bls_aggregation/src/bls_agg_test.rs index 7740b2ac..692f3ea3 100644 --- a/crates/services/bls_aggregation/src/bls_agg_test.rs +++ b/crates/services/bls_aggregation/src/bls_agg_test.rs @@ -652,205 +652,4 @@ pub mod integration_test { .await .unwrap(); } - - // Test with time window: wait to get more signatures - #[tokio::test] - async fn test_1_quorum_2_operators_window() { - let (container, http_endpoint, ws_endpoint) = start_anvil_container().await; - - let registry_coordinator_address = - get_registry_coordinator_address(http_endpoint.clone()).await; - let operator_state_retriever_address = - get_operator_state_retriever_address(http_endpoint.clone()).await; - let service_manager_address = get_service_manager_address(http_endpoint.clone()).await; - let provider = get_provider(http_endpoint.as_str()); - let salt: FixedBytes<32> = FixedBytes::from([0x02; 32]); - - let bls_key_pair_1 = BlsKeyPair::new(BLS_KEY_1.to_string()).unwrap(); - let operator_id_1 = - hex!("fd329fe7e54f459b9c104064efe0172db113a50b5f394949b4ef80b3c34ca7f5").into(); - - let bls_key_pair_2 = BlsKeyPair::new(BLS_KEY_2.to_string()).unwrap(); - let operator_id_2 = - hex!("7213614953817d00866957a5f866c67a5fb8d4e392af501701f7ab35294dc4b3").into(); - - let quorum_nums = Bytes::from([1u8]); - let quorum_threshold_percentages: QuorumThresholdPercentages = vec![100]; - - let contract_registry_coordinator = RegistryCoordinator::new( - registry_coordinator_address, - get_signer(PRIVATE_KEY_1, http_endpoint.as_str()), - ); - - // Create quorum - let operator_set_params = OperatorSetParam { - maxOperatorCount: 10, - kickBIPsOfOperatorStake: 100, - kickBIPsOfTotalStake: 1000, - }; - let strategy_params = vec![StrategyParams { - strategy: get_erc20_mock_strategy(http_endpoint.clone()).await, - multiplier: U96::from(1), - }]; - let _ = contract_registry_coordinator - .createQuorum( - operator_set_params.clone(), - U96::from(0), - strategy_params.clone(), - ) - .send() - .await - .unwrap(); - - // Create avs clients to interact with contracts deployed on anvil - let avs_registry_reader = AvsRegistryChainReader::new( - get_test_logger(), - registry_coordinator_address, - operator_state_retriever_address, - http_endpoint.clone(), - ) - .await - .unwrap(); - - let avs_writer = AvsRegistryChainWriter::build_avs_registry_chain_writer( - get_test_logger(), - http_endpoint.clone(), - PRIVATE_KEY_1.to_string(), - registry_coordinator_address, - operator_state_retriever_address, - ) - .await - .unwrap(); - let operators_info = OperatorInfoServiceInMemory::new( - get_test_logger(), - avs_registry_reader.clone(), - ws_endpoint, - ) - .await; - - let current_block_num = provider.get_block_number().await.unwrap(); - let cancellation_token = CancellationToken::new(); - let operators_info_clone = operators_info.clone(); - let token_clone = cancellation_token.clone(); - - task::spawn(async move { - operators_info_clone - .start_service(&token_clone, 0, current_block_num) - .await - }); - - // Register operator - avs_writer - .register_operator_in_quorum_with_avs_registry_coordinator( - bls_key_pair_1.clone(), - salt, - U256::from_be_slice(&[0xff; 32]), - quorum_nums.clone(), - "socket".to_string(), - ) - .await - .unwrap(); - - let avs_writer = AvsRegistryChainWriter::build_avs_registry_chain_writer( - get_test_logger(), - http_endpoint, - PRIVATE_KEY_2.to_string(), - registry_coordinator_address, - operator_state_retriever_address, - ) - .await - .unwrap(); - avs_writer - .register_operator_in_quorum_with_avs_registry_coordinator( - bls_key_pair_2.clone(), - salt, - U256::from_be_slice(&[0xff; 32]), - quorum_nums.clone(), - "socket".to_string(), - ) - .await - .unwrap(); - - // Sleep is needed so registered operators are accesible to the OperatorInfoServiceInMemory - sleep(Duration::from_secs(3)).await; - - // Create aggregation service - let avs_registry_service = - AvsRegistryServiceChainCaller::new(avs_registry_reader.clone(), operators_info); - - let bls_agg_service = BlsAggregatorService::new(avs_registry_service); - - let current_block_num = provider.get_block_number().await.unwrap(); - - mine_anvil_blocks(&container, 1).await; - - // Create the task related parameters - let task_index: TaskIndex = 0; - let time_to_expiry = Duration::from_secs(3); - - // Initialize the task - bls_agg_service - .initialize_new_task_with_window( - task_index, - current_block_num as u32, - quorum_nums.to_vec(), - quorum_threshold_percentages, - time_to_expiry, - Duration::from_secs(2), - ) - .await - .unwrap(); - - // Compute the signature and send it to the aggregation service - let task_response = 123; - let task_response_digest = hash(task_response); - - let bls_signature_1 = bls_key_pair_1.sign_message(task_response_digest.as_ref()); - bls_agg_service - .process_new_signature( - task_index, - task_response_digest, - bls_signature_1, - operator_id_1, - ) - .await - .unwrap(); - - let bls_signature_2 = bls_key_pair_2.sign_message(task_response_digest.as_ref()); - bls_agg_service - .process_new_signature( - task_index, - task_response_digest, - bls_signature_2, - operator_id_2, - ) - .await - .unwrap(); - - // Wait for the response from the aggregation service - let bls_agg_response = bls_agg_service - .aggregated_response_receiver - .lock() - .await - .recv() - .await - .unwrap() - .unwrap(); - - // Send the shutdown signal to the OperatorInfoServiceInMemory - cancellation_token.cancel(); - - // Check the response - let service_manager = IBLSSignatureChecker::new(service_manager_address, provider); - service_manager - .checkSignatures( - task_response_digest, - quorum_nums, - current_block_num as u32, - agg_response_to_non_signer_stakes_and_signature(bls_agg_response), - ) - .call() - .await - .unwrap(); - } }