Skip to content

Commit

Permalink
Relayer balance metrics (#2976)
Browse files Browse the repository at this point in the history
Done:
- scaffolding for fetching custom agent metrics
- abstractions for building a metrics fetcher for a given VM
- querying cosmos balances; e2e tested.
- querying evm balances; e2e tested.
- **Note that as a result, evm addresses are now no longer zero-padded
when printed in the logs. This may break existing log queries**
- fixed a nasty bug on ubuntu where wasmd (osmosisd dependency, part of
the grpc query flow) would panic when a block is specified via
`x-cosmos-block-height`. The fix was to bump the version of osmosisd
from `19.0.0` to `20.5.0`. **Note that when running e2e on Mac OS, the
osmosis version in use is still 19.0.0**. That's because we need a fork
that publishes a darwin target binary (currently pointing
[here](https://github.com/hashableric/osmosis/releases/download/v19.0.0-mnts/osmosisd-19.0.0-mnts-darwin-arm64.tar.gz))

For follow up PR:
- sealevel balance querying

I'm open to all renaming suggestions, I just tried to speed through and
didn't ponder names too much

Relates to hyperlane-xyz/issues#701
Closes hyperlane-xyz/issues#702 (because the
balance becomes available in the metrics endpoint for polling)
  • Loading branch information
daniel-savu authored Dec 5, 2023
1 parent 2da6cce commit 77aa58c
Show file tree
Hide file tree
Showing 43 changed files with 639 additions and 356 deletions.
3 changes: 3 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 53 additions & 20 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ use derive_more::AsRef;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
metrics::{AgentMetrics, AgentMetricsUpdater},
run_all,
settings::ChainConf,
BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync,
WatermarkContractSync,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, MerkleTreeInsertion, U256,
};
use tokio::{
sync::{
Expand Down Expand Up @@ -49,7 +53,7 @@ struct ContextKey {
#[derive(AsRef)]
pub struct Relayer {
origin_chains: HashSet<HyperlaneDomain>,
destination_chains: HashSet<HyperlaneDomain>,
destination_chains: HashMap<HyperlaneDomain, ChainConf>,
#[as_ref]
core: HyperlaneAgentCore,
message_syncs: HashMap<HyperlaneDomain, Arc<SequencedDataContractSync<HyperlaneMessage>>>,
Expand All @@ -67,6 +71,8 @@ pub struct Relayer {
transaction_gas_limit: Option<U256>,
skip_transaction_gas_limit_for: HashSet<u32>,
allow_local_checkpoint_syncers: bool,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
}

impl Debug for Relayer {
Expand All @@ -92,11 +98,15 @@ impl BaseAgent for Relayer {

type Settings = RelayerSettings;

async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
) -> Result<Self>
where
Self: Sized,
{
let core = settings.build_hyperlane_core(metrics.clone());
let core = settings.build_hyperlane_core(core_metrics.clone());
let db = DB::from_path(&settings.db)?;
let dbs = settings
.origin_chains
Expand All @@ -105,18 +115,18 @@ impl BaseAgent for Relayer {
.collect::<HashMap<_, _>>();

let mailboxes = settings
.build_mailboxes(settings.destination_chains.iter(), &metrics)
.build_mailboxes(settings.destination_chains.iter(), &core_metrics)
.await?;
let validator_announces = settings
.build_validator_announces(settings.origin_chains.iter(), &metrics)
.build_validator_announces(settings.origin_chains.iter(), &core_metrics)
.await?;

let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));
let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&core_metrics));

let message_syncs = settings
.build_message_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand All @@ -126,7 +136,7 @@ impl BaseAgent for Relayer {
let interchain_gas_payment_syncs = settings
.build_interchain_gas_payment_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand All @@ -136,7 +146,7 @@ impl BaseAgent for Relayer {
let merkle_tree_hook_syncs = settings
.build_merkle_tree_hook_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand Down Expand Up @@ -188,9 +198,10 @@ impl BaseAgent for Relayer {
.collect();

let mut msg_ctxs = HashMap::new();
let mut destination_chains = HashMap::new();
for destination in &settings.destination_chains {
let destination_chain_setup = core.settings.chain_setup(destination).unwrap().clone();

destination_chains.insert(destination.clone(), destination_chain_setup.clone());
let transaction_gas_limit: Option<U256> =
if skip_transaction_gas_limit_for.contains(&destination.id()) {
None
Expand Down Expand Up @@ -221,7 +232,7 @@ impl BaseAgent for Relayer {
metadata_builder,
origin_gas_payment_enforcer: gas_payment_enforcers[origin].clone(),
transaction_gas_limit,
metrics: MessageSubmissionMetrics::new(&metrics, origin, destination),
metrics: MessageSubmissionMetrics::new(&core_metrics, origin, destination),
}),
);
}
Expand All @@ -230,7 +241,7 @@ impl BaseAgent for Relayer {
Ok(Self {
dbs,
origin_chains: settings.origin_chains,
destination_chains: settings.destination_chains,
destination_chains,
msg_ctxs,
core,
message_syncs,
Expand All @@ -242,6 +253,8 @@ impl BaseAgent for Relayer {
transaction_gas_limit,
skip_transaction_gas_limit_for,
allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers,
core_metrics,
agent_metrics,
})
}

Expand All @@ -251,12 +264,32 @@ impl BaseAgent for Relayer {

// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
for destination in &self.destination_chains {
for (dest_domain, dest_conf) in &self.destination_chains {
let (send_channel, receive_channel) =
mpsc::unbounded_channel::<Box<DynPendingOperation>>();
send_channels.insert(destination.id(), send_channel);
send_channels.insert(dest_domain.id(), send_channel);

tasks.push(self.run_destination_submitter(dest_domain, receive_channel));

tasks.push(self.run_destination_submitter(destination, receive_channel));
let agent_metrics_conf = dest_conf
.agent_metrics_conf(Self::AGENT_NAME.to_string())
.await
.unwrap();
let agent_metrics_fetcher = dest_conf.build_provider(&self.core_metrics).await.unwrap();
let agent_metrics = AgentMetricsUpdater::new(
self.agent_metrics.clone(),
agent_metrics_conf,
agent_metrics_fetcher,
);

let fetcher_task = tokio::spawn(async move {
agent_metrics
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("AgentMetrics"));
tasks.push(fetcher_task);
}

for origin in &self.origin_chains {
Expand Down Expand Up @@ -330,11 +363,11 @@ impl Relayer {
let metrics = MessageProcessorMetrics::new(
&self.core.metrics,
origin,
self.destination_chains.iter(),
self.destination_chains.keys(),
);
let destination_ctxs = self
.destination_chains
.iter()
.keys()
.filter(|&destination| destination != origin)
.map(|destination| {
(
Expand Down
8 changes: 6 additions & 2 deletions rust/agents/scraper/migration/bin/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::env;
use std::{env, time::Duration};

use migration::sea_orm::{Database, DatabaseConnection};
pub use migration::{DbErr, Migrator, MigratorTrait as _};
use sea_orm::ConnectOptions;

const LOCAL_DATABASE_URL: &str = "postgresql://postgres:47221c18c610@localhost:5432/postgres";
const CONNECT_TIMEOUT: u64 = 20;

pub fn url() -> String {
env::var("DATABASE_URL").unwrap_or_else(|_| LOCAL_DATABASE_URL.into())
Expand All @@ -16,6 +18,8 @@ pub async fn init() -> Result<DatabaseConnection, DbErr> {
.init();

let url = url();
let mut options: ConnectOptions = url.clone().into();
options.connect_timeout(Duration::from_secs(CONNECT_TIMEOUT));
println!("Connecting to {url}");
Database::connect(url).await
Database::connect(options).await
}
5 changes: 3 additions & 2 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use derive_more::AsRef;
use hyperlane_base::{
run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore,
metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics,
CoreMetrics, HyperlaneAgentCore,
};
use hyperlane_core::HyperlaneDomain;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -38,6 +38,7 @@ impl BaseAgent for Scraper {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
) -> eyre::Result<Self>
where
Self: Sized,
Expand Down
7 changes: 6 additions & 1 deletion rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument

use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::AgentMetrics,
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync,
};
Expand Down Expand Up @@ -51,7 +52,11 @@ impl BaseAgent for Validator {

type Settings = ValidatorSettings;

async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
) -> Result<Self>
where
Self: Sized,
{
Expand Down
15 changes: 10 additions & 5 deletions rust/chains/hyperlane-cosmos/src/aggregation_ism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;

use crate::{
address::CosmosAddress,
grpc::{WasmGrpcProvider, WasmProvider},
grpc::WasmProvider,
payloads::aggregate_ism::{ModulesAndThresholdRequest, ModulesAndThresholdResponse},
ConnectionConf, CosmosProvider, Signer,
};
Expand All @@ -18,7 +18,7 @@ use tracing::instrument;
pub struct CosmosAggregationIsm {
domain: HyperlaneDomain,
address: H256,
provider: Box<WasmGrpcProvider>,
provider: Box<CosmosProvider>,
}

impl CosmosAggregationIsm {
Expand All @@ -28,7 +28,12 @@ impl CosmosAggregationIsm {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;

Ok(Self {
domain: locator.domain.clone(),
Expand All @@ -50,7 +55,7 @@ impl HyperlaneChain for CosmosAggregationIsm {
}

fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
self.provider.clone()
}
}

Expand All @@ -63,7 +68,7 @@ impl AggregationIsm for CosmosAggregationIsm {
) -> ChainResult<(Vec<H256>, u8)> {
let payload = ModulesAndThresholdRequest::new(message);

let data = self.provider.wasm_query(payload, None).await?;
let data = self.provider.grpc().wasm_query(payload, None).await?;
let response: ModulesAndThresholdResponse = serde_json::from_slice(&data)?;

let modules: ChainResult<Vec<H256>> = response
Expand Down
12 changes: 9 additions & 3 deletions rust/chains/hyperlane-cosmos/src/interchain_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use once_cell::sync::Lazy;
use std::ops::RangeInclusive;

use crate::{
grpc::WasmGrpcProvider,
rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer},
signers::Signer,
utils::{CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64},
Expand All @@ -22,6 +21,7 @@ use crate::{
pub struct CosmosInterchainGasPaymaster {
domain: HyperlaneDomain,
address: H256,
provider: CosmosProvider,
}

impl HyperlaneContract for CosmosInterchainGasPaymaster {
Expand All @@ -36,7 +36,7 @@ impl HyperlaneChain for CosmosInterchainGasPaymaster {
}

fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}

Expand All @@ -49,11 +49,17 @@ impl CosmosInterchainGasPaymaster {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;

Ok(Self {
domain: locator.domain.clone(),
address: locator.address,
provider,
})
}
}
Expand Down
Loading

0 comments on commit 77aa58c

Please sign in to comment.