Skip to content

Commit

Permalink
wip: cosmos fallback provider
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Jan 4, 2024
1 parent 80f1a56 commit d9762b2
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 44 deletions.
98 changes: 56 additions & 42 deletions rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use derive_new::new;
use hyperlane_core::{error::HyperlaneCustomError, ChainCommunicationError};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand All @@ -16,9 +17,9 @@ use tracing::{info, instrument, warn_span};
use ethers_prometheus::json_rpc_client::PrometheusJsonRpcClientConfigExt;

use crate::rpc_clients::{categorize_client_response, CategorizedResponse};
use crate::BlockNumberGetter;

const MAX_BLOCK_TIME: Duration = Duration::from_secs(2 * 60);
const BLOCK_NUMBER_RPC: &str = "eth_blockNumber";

#[derive(Clone, Copy, new)]
struct PrioritizedProviderInner {
Expand All @@ -45,6 +46,8 @@ struct PrioritizedProviders<T> {
priorities: RwLock<Vec<PrioritizedProviderInner>>,
}

// : Into<Box<dyn BlockNumberGetter>>

/// A provider that bundles multiple providers and attempts to call the first,
/// then the second, and so on until a response is received.
pub struct FallbackProvider<T> {
Expand Down Expand Up @@ -90,27 +93,51 @@ where
}
}

impl<T> FallbackProvider<T> {
impl<T> FallbackProvider<T>
where
T: Debug + Into<Box<dyn BlockNumberGetter>> + Clone,
{
/// Convenience method for creating a `FallbackProviderBuilder` with same
/// `JsonRpcClient` types
pub fn builder() -> FallbackProviderBuilder<T> {
FallbackProviderBuilder::default()
pub fn builder(block_number_getter: Arc<dyn BlockNumberGetter>) -> FallbackProviderBuilder<T> {
FallbackProviderBuilder::new(block_number_getter)
}

/// Create a new fallback provider
pub fn new(providers: impl IntoIterator<Item = T>) -> Self {
Self::builder().add_providers(providers).build()
pub fn new(
providers: impl IntoIterator<Item = T>,
block_number_getter: Arc<dyn BlockNumberGetter>,
) -> Self {
Self::builder(block_number_getter)
.add_providers(providers)
.build()
}

async fn deprioritize_provider(&self, priority: PrioritizedProviderInner) {
// De-prioritize the current provider by moving it to the end of the queue
let mut priorities = self.inner.priorities.write().await;
priorities.retain(|&p| p.index != priority.index);
priorities.push(priority);
}

async fn update_last_seen_block(&self, provider_index: usize, current_block_height: u64) {
let mut priorities = self.inner.priorities.write().await;
// Get provider position in the up-to-date priorities vec
if let Some(position) = priorities.iter().position(|p| p.index == provider_index) {
priorities[position] =
PrioritizedProviderInner::from_block_height(provider_index, current_block_height);
}
}

async fn take_priorities_snapshot(&self) -> Vec<PrioritizedProviderInner> {
let read_lock = self.inner.priorities.read().await;
(*read_lock).clone()
}
}

impl<C> FallbackProvider<C>
where
C: JsonRpcClient,
{
async fn handle_stalled_provider(
&self,
priority: &PrioritizedProviderInner,
provider: &C,
provider: &T,
) -> Result<(), ProviderError> {
let now = Instant::now();
if now
Expand All @@ -121,17 +148,18 @@ where
return Ok(());
}

let current_block_height: u64 = provider
.request(BLOCK_NUMBER_RPC, ())
// let prov: T = provider.clone();
let block_getter: Box<dyn BlockNumberGetter> = (provider.clone()).into();
let current_block_height = block_getter
.get()
.await
.map(|r: U64| r.as_u64())
.unwrap_or(priority.last_block_height.0);
if current_block_height <= priority.last_block_height.0 {
// The `max_block_time` elapsed but the block number returned by the provider has not increased
self.deprioritize_provider(*priority).await;
info!(
provider_index=%priority.index,
?provider,
provider=?self.inner.providers[priority.index],
"Deprioritizing an inner provider in FallbackProvider",
);
} else {
Expand All @@ -140,46 +168,27 @@ where
}
Ok(())
}

async fn deprioritize_provider(&self, priority: PrioritizedProviderInner) {
// De-prioritize the current provider by moving it to the end of the queue
let mut priorities = self.inner.priorities.write().await;
priorities.retain(|&p| p.index != priority.index);
priorities.push(priority);
}

async fn update_last_seen_block(&self, provider_index: usize, current_block_height: u64) {
let mut priorities = self.inner.priorities.write().await;
// Get provider position in the up-to-date priorities vec
if let Some(position) = priorities.iter().position(|p| p.index == provider_index) {
priorities[position] =
PrioritizedProviderInner::from_block_height(provider_index, current_block_height);
}
}

async fn take_priorities_snapshot(&self) -> Vec<PrioritizedProviderInner> {
let read_lock = self.inner.priorities.read().await;
(*read_lock).clone()
}
}

impl<C> FallbackProvider<C> where C: JsonRpcClient {}

/// Builder to create a new fallback provider.
#[derive(Debug, Clone)]
pub struct FallbackProviderBuilder<T> {
providers: Vec<T>,
max_block_time: Duration,
block_number_getter: Arc<dyn BlockNumberGetter>,
}

impl<T> Default for FallbackProviderBuilder<T> {
fn default() -> Self {
impl<T> FallbackProviderBuilder<T> {
pub fn new(block_number_getter: Arc<dyn BlockNumberGetter>) -> Self {
Self {
providers: Vec::new(),
max_block_time: MAX_BLOCK_TIME,
block_number_getter,
}
}
}

impl<T> FallbackProviderBuilder<T> {
/// Add a new provider to the set. Each new provider will be a lower
/// priority than the previous.
pub fn add_provider(mut self, provider: T) -> Self {
Expand Down Expand Up @@ -236,7 +245,10 @@ impl From<FallbackError> for ProviderError {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<C> JsonRpcClient for FallbackProvider<C>
where
C: JsonRpcClient<Error = HttpClientError> + PrometheusJsonRpcClientConfigExt,
C: JsonRpcClient<Error = HttpClientError>
+ PrometheusJsonRpcClientConfigExt
+ Clone
+ Into<Box<dyn BlockNumberGetter>>,
{
type Error = ProviderError;

Expand Down Expand Up @@ -281,6 +293,8 @@ where

#[cfg(test)]
mod tests {
use crate::BLOCK_NUMBER_RPC;

use super::*;
use std::sync::Mutex;

Expand Down
51 changes: 50 additions & 1 deletion rust/chains/hyperlane-ethereum/src/trait_builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Write;
use std::fmt::{Debug, Write};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -10,6 +10,8 @@ use ethers::prelude::{
Http, JsonRpcClient, Middleware, NonceManagerMiddleware, Provider, Quorum, QuorumProvider,
SignerMiddleware, WeightedProvider, Ws, WsClientError,
};
use ethers::providers::ProviderError;
use ethers_core::types::U64;
use hyperlane_core::metrics::agent::METRICS_SCRAPE_INTERVAL;
use reqwest::{Client, Url};
use thiserror::Error;
Expand Down Expand Up @@ -278,3 +280,50 @@ where
};
Ok(GasOracleMiddleware::new(provider, gas_oracle))
}

pub const BLOCK_NUMBER_RPC: &str = "eth_blockNumber";

pub struct JsonRpcBlockGetter<'a, T: JsonRpcClient>(&'a T);

impl<'a, C> JsonRpcBlockGetter<'a, C>
where
C: JsonRpcClient,
<C as JsonRpcClient>::Error: Into<ProviderError>,
{
async fn get_block_number(&self) -> Result<u64, ProviderError> {
let res = self
.0
.request(BLOCK_NUMBER_RPC, ())
.await
.map(|r: U64| r.as_u64())
.map_err(Into::into)?;
Ok(res)
}
}

#[async_trait]
pub trait BlockNumberGetter: Send + Sync + Debug {
async fn get(&self) -> Result<u64, ProviderError>;
}

#[async_trait]
impl<C> BlockNumberGetter for PrometheusJsonRpcClient<C>
where
C: JsonRpcClient,
{
async fn get(&self) -> Result<u64, ProviderError> {
let res = self
.0
.request(BLOCK_NUMBER_RPC, ())
.await
.map(|r: U64| r.as_u64())
.map_err(Into::into)?;
Ok(res)
}
}

impl Into<Box<dyn BlockNumberGetter>> for PrometheusJsonRpcClient<Http> {
fn into(self) -> Box<dyn BlockNumberGetter> {
Box::new(JsonRpcBlockGetter(&self.0))
}
}
2 changes: 1 addition & 1 deletion rust/hyperlane-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub mod metrics;
mod types;

mod chain;
mod error;
pub mod error;

/// Enum for validity of a list of messages
#[derive(Debug)]
Expand Down

0 comments on commit d9762b2

Please sign in to comment.