Skip to content

Commit

Permalink
feat: add agent metrics fetchers
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Nov 28, 2023
1 parent 30323f1 commit 42d887e
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 57 deletions.
69 changes: 47 additions & 22 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ use derive_more::AsRef;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::{
self,
agent::{AgentMetrics, Metrics, MetricsFetcher},
},
run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, MerkleTreeInsertion, U256,
};
use tokio::{
sync::{
Expand Down Expand Up @@ -92,11 +97,15 @@ impl BaseAgent for Relayer {

type Settings = RelayerSettings;

async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: Metrics,
) -> Result<(Self, Vec<MetricsFetcher>)>
where
Self: Sized,
{
let core = settings.build_hyperlane_core(metrics.clone());
let core = settings.build_hyperlane_core(core_metrics.clone());
let db = DB::from_path(&settings.db)?;
let dbs = settings
.origin_chains
Expand All @@ -105,18 +114,18 @@ impl BaseAgent for Relayer {
.collect::<HashMap<_, _>>();

let mailboxes = settings
.build_mailboxes(settings.destination_chains.iter(), &metrics)
.build_mailboxes(settings.destination_chains.iter(), &core_metrics)
.await?;
let validator_announces = settings
.build_validator_announces(settings.origin_chains.iter(), &metrics)
.build_validator_announces(settings.origin_chains.iter(), &core_metrics)
.await?;

let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));
let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&core_metrics));

let message_syncs = settings
.build_message_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand All @@ -126,7 +135,7 @@ impl BaseAgent for Relayer {
let interchain_gas_payment_syncs = settings
.build_interchain_gas_payment_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand All @@ -136,7 +145,7 @@ impl BaseAgent for Relayer {
let merkle_tree_hook_syncs = settings
.build_merkle_tree_hook_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand Down Expand Up @@ -188,20 +197,29 @@ impl BaseAgent for Relayer {
.collect();

let mut msg_ctxs = HashMap::new();
let mut metrics_fetchers = vec![];
// let mut custom_metrics = HashMap::new();
for destination in &settings.destination_chains {
let destination_chain_setup = core.settings.chain_setup(destination).unwrap().clone();
let agent_metrics_conf = destination_chain_setup
.agent_metrics_conf("relayer".to_owned())
.await?;
println!("~~~ agent metrics: {:?}", agent_metrics_conf);
println!("~~~ agent signer: {:?}", destination_chain_setup.signer);
// custom_metrics.insert(
// destination.id(),
// destination_chain_setup
// .metrics(destination)
// .expect("Missing metrics config"),
// );
// PrometheusAgent
let metrics_fetcher = destination_chain_setup
.build_agent_metrics_fetcher()
.await?;
let agent_metrics =
AgentMetrics::new(agent_metrics.clone(), agent_metrics_conf, metrics_fetcher);

let fetcher_task = tokio::spawn(async move {
agent_metrics
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("AgentMetricsFetcher"));
metrics_fetchers.push(fetcher_task);

let transaction_gas_limit: Option<U256> =
if skip_transaction_gas_limit_for.contains(&destination.id()) {
None
Expand Down Expand Up @@ -232,13 +250,13 @@ impl BaseAgent for Relayer {
metadata_builder,
origin_gas_payment_enforcer: gas_payment_enforcers[origin].clone(),
transaction_gas_limit,
metrics: MessageSubmissionMetrics::new(&metrics, origin, destination),
metrics: MessageSubmissionMetrics::new(&core_metrics, origin, destination),
}),
);
}
}

Ok(Self {
let relayer = Self {
dbs,
origin_chains: settings.origin_chains,
destination_chains: settings.destination_chains,
Expand All @@ -253,12 +271,19 @@ impl BaseAgent for Relayer {
transaction_gas_limit,
skip_transaction_gas_limit_for,
allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers,
})
};

Ok((relayer, metrics_fetchers))
}

#[allow(clippy::async_yields_async)]
async fn run(self) -> Instrumented<JoinHandle<Result<()>>> {
let mut tasks = vec![];
async fn run(
self,
metrics_fetchers: Vec<MetricsFetcher>,
) -> Instrumented<JoinHandle<Result<()>>> {
// The tasks vec is initially set to the metrics fetcher tasks,
// and is then extended with the rest of the tasks.
let mut tasks = metrics_fetchers;

// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
Expand Down
29 changes: 19 additions & 10 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use derive_more::AsRef;
use hyperlane_base::{
run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore,
metrics::agent::{Metrics as AgentMetrics, MetricsFetcher},
run_all,
settings::IndexSettings,
BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
};
use hyperlane_core::HyperlaneDomain;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -38,7 +40,8 @@ impl BaseAgent for Scraper {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
) -> eyre::Result<Self>
agent_metrics: AgentMetrics,
) -> eyre::Result<(Self, Vec<MetricsFetcher>)>
where
Self: Sized,
{
Expand Down Expand Up @@ -73,16 +76,22 @@ impl BaseAgent for Scraper {

trace!(domain_count = scrapers.len(), "Created scrapers");

Ok(Self {
core,
metrics,
contract_sync_metrics,
scrapers,
})
Ok((
Self {
core,
metrics,
contract_sync_metrics,
scrapers,
},
Default::default(),
))
}

#[allow(clippy::async_yields_async)]
async fn run(self) -> Instrumented<JoinHandle<eyre::Result<()>>> {
async fn run(
self,
_metrics_fetchers: Vec<MetricsFetcher>,
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
let mut tasks = Vec::with_capacity(self.scrapers.len());
for domain in self.scrapers.keys() {
tasks.push(self.scrape(*domain).await);
Expand Down
22 changes: 17 additions & 5 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument

use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::agent::{Metrics as AgentMetrics, MetricsFetcher},
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync,
};
Expand Down Expand Up @@ -51,7 +52,11 @@ impl BaseAgent for Validator {

type Settings = ValidatorSettings;

async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
) -> Result<(Self, Vec<MetricsFetcher>)>
where
Self: Sized,
{
Expand Down Expand Up @@ -88,7 +93,7 @@ impl BaseAgent for Validator {
.await?
.into();

Ok(Self {
let validator = Self {
origin_chain: settings.origin_chain,
core,
db: msg_db,
Expand All @@ -101,12 +106,19 @@ impl BaseAgent for Validator {
reorg_period: settings.reorg_period,
interval: settings.interval,
checkpoint_syncer,
})
};

Ok((validator, Default::default()))
}

#[allow(clippy::async_yields_async)]
async fn run(mut self) -> Instrumented<JoinHandle<Result<()>>> {
let mut tasks = vec![];
async fn run(
mut self,
metrics_fetchers: Vec<MetricsFetcher>,
) -> Instrumented<JoinHandle<Result<()>>> {
// The tasks vec is initially set to the metrics fetcher tasks,
// and is then extended with the rest of the tasks.
let mut tasks = metrics_fetchers;

if let Some(signer_instance) = self.signer_instance.take() {
tasks.push(
Expand Down
3 changes: 2 additions & 1 deletion rust/chains/hyperlane-ethereum/src/trait_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ethers::prelude::{
Http, JsonRpcClient, Middleware, NonceManagerMiddleware, Provider, Quorum, QuorumProvider,
SignerMiddleware, WeightedProvider, Ws, WsClientError,
};
use hyperlane_core::metrics::agent::METRICS_SCRAPE_INTERVAL;
use reqwest::{Client, Url};
use thiserror::Error;

Expand All @@ -27,7 +28,6 @@ use hyperlane_core::{
use crate::{signers::Signers, ConnectionConf, FallbackProvider, RetryingProvider};

// This should be whatever the prometheus scrape interval is
const METRICS_SCRAPE_INTERVAL: Duration = Duration::from_secs(60);
const HTTP_CLIENT_TIMEOUT: Duration = Duration::from_secs(60);

/// An error when connecting to an ethereum provider.
Expand Down Expand Up @@ -194,6 +194,7 @@ pub trait BuildableWithProvider {

Ok(if let Some(metrics) = metrics {
let provider = Arc::new(PrometheusMiddleware::new(provider, metrics.0, metrics.1));
// This has to be moved
tokio::spawn(provider.start_updating_on_interval(METRICS_SCRAPE_INTERVAL));
self.build_with_signer(provider, locator, signer).await?
} else {
Expand Down
25 changes: 20 additions & 5 deletions rust/hyperlane-base/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ use hyperlane_core::config::*;
use tokio::task::JoinHandle;
use tracing::{debug_span, instrument::Instrumented, Instrument};

use crate::{metrics::CoreMetrics, settings::Settings};
use crate::{
metrics::{
agent::{create_agent_metrics, Metrics as AgentMetrics, MetricsFetcher},
CoreMetrics,
},
settings::Settings,
};

/// Properties shared across all hyperlane agents
#[derive(Debug)]
Expand Down Expand Up @@ -36,13 +42,20 @@ pub trait BaseAgent: Send + Sync + Debug {
type Settings: LoadableFromSettings;

/// Instantiate the agent from the standard settings object
async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
) -> Result<(Self, Vec<MetricsFetcher>)>
where
Self: Sized;

/// Start running this agent.
#[allow(clippy::async_yields_async)]
async fn run(self) -> Instrumented<JoinHandle<Result<()>>>;
async fn run(
self,
metrics_fetchers: Vec<MetricsFetcher>,
) -> Instrumented<JoinHandle<Result<()>>>;
}

/// Call this from `main` to fully initialize and run the agent for its entire
Expand All @@ -68,10 +81,12 @@ pub async fn agent_main<A: BaseAgent>() -> Result<()> {

let metrics = settings.as_ref().metrics(A::AGENT_NAME)?;
core_settings.tracing.start_tracing(&metrics)?;
let agent = A::from_settings(settings, metrics.clone()).await?;
let agent_metrics = create_agent_metrics(&metrics)?;
let (agent, metrics_fetchers) =
A::from_settings(settings, metrics.clone(), agent_metrics).await?;
metrics.run_http_server();

agent.run().await.await?
agent.run(metrics_fetchers).await.await?
}

/// Utility to run multiple tasks and shutdown if any one task ends.
Expand Down
2 changes: 1 addition & 1 deletion rust/hyperlane-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub mod settings;
mod agent;
pub use agent::*;

mod metrics;
pub mod metrics;
pub use metrics::*;

mod contract_sync;
Expand Down
Loading

0 comments on commit 42d887e

Please sign in to comment.