From d9762b27d2ceeb741e750bd3ca1c1df4872e8646 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Thu, 4 Jan 2024 18:09:22 +0000 Subject: [PATCH] wip: cosmos fallback provider --- .../src/rpc_clients/fallback.rs | 98 +++++++++++-------- .../hyperlane-ethereum/src/trait_builder.rs | 51 +++++++++- rust/hyperlane-core/src/lib.rs | 2 +- 3 files changed, 107 insertions(+), 44 deletions(-) diff --git a/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs b/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs index af1dd28ce25..75a0f61e19c 100644 --- a/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs +++ b/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs @@ -1,4 +1,5 @@ use derive_new::new; +use hyperlane_core::{error::HyperlaneCustomError, ChainCommunicationError}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -16,9 +17,9 @@ use tracing::{info, instrument, warn_span}; use ethers_prometheus::json_rpc_client::PrometheusJsonRpcClientConfigExt; use crate::rpc_clients::{categorize_client_response, CategorizedResponse}; +use crate::BlockNumberGetter; const MAX_BLOCK_TIME: Duration = Duration::from_secs(2 * 60); -const BLOCK_NUMBER_RPC: &str = "eth_blockNumber"; #[derive(Clone, Copy, new)] struct PrioritizedProviderInner { @@ -45,6 +46,8 @@ struct PrioritizedProviders { priorities: RwLock>, } +// : Into> + /// A provider that bundles multiple providers and attempts to call the first, /// then the second, and so on until a response is received. pub struct FallbackProvider { @@ -90,27 +93,51 @@ where } } -impl FallbackProvider { +impl FallbackProvider +where + T: Debug + Into> + Clone, +{ /// Convenience method for creating a `FallbackProviderBuilder` with same /// `JsonRpcClient` types - pub fn builder() -> FallbackProviderBuilder { - FallbackProviderBuilder::default() + pub fn builder(block_number_getter: Arc) -> FallbackProviderBuilder { + FallbackProviderBuilder::new(block_number_getter) } /// Create a new fallback provider - pub fn new(providers: impl IntoIterator) -> Self { - Self::builder().add_providers(providers).build() + pub fn new( + providers: impl IntoIterator, + block_number_getter: Arc, + ) -> Self { + Self::builder(block_number_getter) + .add_providers(providers) + .build() + } + + async fn deprioritize_provider(&self, priority: PrioritizedProviderInner) { + // De-prioritize the current provider by moving it to the end of the queue + let mut priorities = self.inner.priorities.write().await; + priorities.retain(|&p| p.index != priority.index); + priorities.push(priority); + } + + async fn update_last_seen_block(&self, provider_index: usize, current_block_height: u64) { + let mut priorities = self.inner.priorities.write().await; + // Get provider position in the up-to-date priorities vec + if let Some(position) = priorities.iter().position(|p| p.index == provider_index) { + priorities[position] = + PrioritizedProviderInner::from_block_height(provider_index, current_block_height); + } + } + + async fn take_priorities_snapshot(&self) -> Vec { + let read_lock = self.inner.priorities.read().await; + (*read_lock).clone() } -} -impl FallbackProvider -where - C: JsonRpcClient, -{ async fn handle_stalled_provider( &self, priority: &PrioritizedProviderInner, - provider: &C, + provider: &T, ) -> Result<(), ProviderError> { let now = Instant::now(); if now @@ -121,17 +148,18 @@ where return Ok(()); } - let current_block_height: u64 = provider - .request(BLOCK_NUMBER_RPC, ()) + // let prov: T = provider.clone(); + let block_getter: Box = (provider.clone()).into(); + let current_block_height = block_getter + .get() .await - .map(|r: U64| r.as_u64()) .unwrap_or(priority.last_block_height.0); if current_block_height <= priority.last_block_height.0 { // The `max_block_time` elapsed but the block number returned by the provider has not increased self.deprioritize_provider(*priority).await; info!( provider_index=%priority.index, - ?provider, + provider=?self.inner.providers[priority.index], "Deprioritizing an inner provider in FallbackProvider", ); } else { @@ -140,46 +168,27 @@ where } Ok(()) } - - async fn deprioritize_provider(&self, priority: PrioritizedProviderInner) { - // De-prioritize the current provider by moving it to the end of the queue - let mut priorities = self.inner.priorities.write().await; - priorities.retain(|&p| p.index != priority.index); - priorities.push(priority); - } - - async fn update_last_seen_block(&self, provider_index: usize, current_block_height: u64) { - let mut priorities = self.inner.priorities.write().await; - // Get provider position in the up-to-date priorities vec - if let Some(position) = priorities.iter().position(|p| p.index == provider_index) { - priorities[position] = - PrioritizedProviderInner::from_block_height(provider_index, current_block_height); - } - } - - async fn take_priorities_snapshot(&self) -> Vec { - let read_lock = self.inner.priorities.read().await; - (*read_lock).clone() - } } +impl FallbackProvider where C: JsonRpcClient {} + /// Builder to create a new fallback provider. #[derive(Debug, Clone)] pub struct FallbackProviderBuilder { providers: Vec, max_block_time: Duration, + block_number_getter: Arc, } -impl Default for FallbackProviderBuilder { - fn default() -> Self { +impl FallbackProviderBuilder { + pub fn new(block_number_getter: Arc) -> Self { Self { providers: Vec::new(), max_block_time: MAX_BLOCK_TIME, + block_number_getter, } } -} -impl FallbackProviderBuilder { /// Add a new provider to the set. Each new provider will be a lower /// priority than the previous. pub fn add_provider(mut self, provider: T) -> Self { @@ -236,7 +245,10 @@ impl From for ProviderError { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl JsonRpcClient for FallbackProvider where - C: JsonRpcClient + PrometheusJsonRpcClientConfigExt, + C: JsonRpcClient + + PrometheusJsonRpcClientConfigExt + + Clone + + Into>, { type Error = ProviderError; @@ -281,6 +293,8 @@ where #[cfg(test)] mod tests { + use crate::BLOCK_NUMBER_RPC; + use super::*; use std::sync::Mutex; diff --git a/rust/chains/hyperlane-ethereum/src/trait_builder.rs b/rust/chains/hyperlane-ethereum/src/trait_builder.rs index 89e4f31d4f8..f27d4bda796 100644 --- a/rust/chains/hyperlane-ethereum/src/trait_builder.rs +++ b/rust/chains/hyperlane-ethereum/src/trait_builder.rs @@ -1,4 +1,4 @@ -use std::fmt::Write; +use std::fmt::{Debug, Write}; use std::sync::Arc; use std::time::Duration; @@ -10,6 +10,8 @@ use ethers::prelude::{ Http, JsonRpcClient, Middleware, NonceManagerMiddleware, Provider, Quorum, QuorumProvider, SignerMiddleware, WeightedProvider, Ws, WsClientError, }; +use ethers::providers::ProviderError; +use ethers_core::types::U64; use hyperlane_core::metrics::agent::METRICS_SCRAPE_INTERVAL; use reqwest::{Client, Url}; use thiserror::Error; @@ -278,3 +280,50 @@ where }; Ok(GasOracleMiddleware::new(provider, gas_oracle)) } + +pub const BLOCK_NUMBER_RPC: &str = "eth_blockNumber"; + +pub struct JsonRpcBlockGetter<'a, T: JsonRpcClient>(&'a T); + +impl<'a, C> JsonRpcBlockGetter<'a, C> +where + C: JsonRpcClient, + ::Error: Into, +{ + async fn get_block_number(&self) -> Result { + let res = self + .0 + .request(BLOCK_NUMBER_RPC, ()) + .await + .map(|r: U64| r.as_u64()) + .map_err(Into::into)?; + Ok(res) + } +} + +#[async_trait] +pub trait BlockNumberGetter: Send + Sync + Debug { + async fn get(&self) -> Result; +} + +#[async_trait] +impl BlockNumberGetter for PrometheusJsonRpcClient +where + C: JsonRpcClient, +{ + async fn get(&self) -> Result { + let res = self + .0 + .request(BLOCK_NUMBER_RPC, ()) + .await + .map(|r: U64| r.as_u64()) + .map_err(Into::into)?; + Ok(res) + } +} + +impl Into> for PrometheusJsonRpcClient { + fn into(self) -> Box { + Box::new(JsonRpcBlockGetter(&self.0)) + } +} diff --git a/rust/hyperlane-core/src/lib.rs b/rust/hyperlane-core/src/lib.rs index 6834df39511..7b9c519f871 100644 --- a/rust/hyperlane-core/src/lib.rs +++ b/rust/hyperlane-core/src/lib.rs @@ -33,7 +33,7 @@ pub mod metrics; mod types; mod chain; -mod error; +pub mod error; /// Enum for validity of a list of messages #[derive(Debug)]