From 4d5b9f23f3058798a4d0aeeaaea2cef133d976a6 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 27 Mar 2024 17:41:56 +0100 Subject: [PATCH 1/8] move bench run+persist logic around --- bench/src/lib.rs | 3 +- ...service_adapter.rs => service_adapter1.rs} | 0 bench/src/service_adapter_new.rs | 18 ++++ benchrunner-service/src/main.rs | 97 +++++++++++++++---- .../src/postgres/metrics_dbstore.rs | 62 ++++++------ .../src/prometheus/metrics_prometheus.rs | 2 +- 6 files changed, 128 insertions(+), 54 deletions(-) rename bench/src/{service_adapter.rs => service_adapter1.rs} (100%) create mode 100644 bench/src/service_adapter_new.rs diff --git a/bench/src/lib.rs b/bench/src/lib.rs index 83587d7c..2ababe48 100644 --- a/bench/src/lib.rs +++ b/bench/src/lib.rs @@ -26,7 +26,8 @@ pub mod bench1; pub mod benches; pub mod helpers; pub mod metrics; -pub mod service_adapter; +pub mod service_adapter1; +pub mod service_adapter_new; pub mod tx_size; #[derive(Parser, Debug)] diff --git a/bench/src/service_adapter.rs b/bench/src/service_adapter1.rs similarity index 100% rename from bench/src/service_adapter.rs rename to bench/src/service_adapter1.rs diff --git a/bench/src/service_adapter_new.rs b/bench/src/service_adapter_new.rs new file mode 100644 index 00000000..eb5a841d --- /dev/null +++ b/bench/src/service_adapter_new.rs @@ -0,0 +1,18 @@ +use solana_sdk::signature::Keypair; +use tokio::time::Instant; +use crate::metrics::Metric; +use crate::service_adapter1::BenchConfig; +use crate::tx_size::TxSize; + +pub async fn benchnew_confirmation_rate_servicerunner( + bench_config: &BenchConfig, + rpc_addr: String, + funded_payer: Keypair, + size_tx: TxSize, +) -> Metric { + let started_at = Instant::now(); + + + + todo!() +} diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index 4e45a0fa..557ec897 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -3,23 +3,29 @@ mod cli; mod postgres; mod prometheus; -use crate::args::{get_funded_payer_from_env, read_tenant_configs}; +use crate::args::{get_funded_payer_from_env, read_tenant_configs, TenantConfig}; use crate::cli::Args; use crate::postgres::metrics_dbstore::{ - save_metrics_to_postgres, upsert_benchrun_status, BenchRunStatus, + upsert_benchrun_status, BenchRunStatus, }; use crate::postgres::postgres_session::PostgresSessionConfig; use crate::postgres::postgres_session_cache::PostgresSessionCache; use crate::prometheus::metrics_prometheus::publish_metrics_on_prometheus; use crate::prometheus::prometheus_sync::PrometheusSync; -use bench::service_adapter::BenchConfig; +use bench::service_adapter1::BenchConfig; use clap::Parser; use futures_util::future::join_all; use itertools::Itertools; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use std::net::SocketAddr; use std::str::FromStr; +use std::sync::Arc; use std::time::{Duration, SystemTime}; +use async_trait::async_trait; +use postgres_types::ToSql; +use solana_sdk::signature::Keypair; +use bench::metrics::Metric; +use bench::tx_size::TxSize; #[tokio::main] async fn main() { @@ -36,7 +42,7 @@ async fn main() { let bench_interval = Duration::from_millis(bench_interval); - let funded_payer = get_funded_payer_from_env(); + let funded_payer = Arc::new(get_funded_payer_from_env()); let tenant_configs = read_tenant_configs(std::env::vars().collect::>()); @@ -76,8 +82,10 @@ async fn main() { }) .collect_vec(); + // 1 task per tenant - each task will perform bench runs in sequence + // (the slot comparison bench is done somewhere else) for tenant_config in &tenant_configs { - let funded_payer = funded_payer.insecure_clone(); + let funded_payer = funded_payer.clone(); let tenant_id = tenant_config.tenant_id.clone(); let postgres_session = postgres_session.clone(); let tenant_config = tenant_config.clone(); @@ -92,6 +100,14 @@ async fn main() { ); let benchrun_at = SystemTime::now(); + let bench_impl = BenchRunnerOldBenchImpl { + benchrun_at, + tenant_config: tenant_config.clone(), + bench_config: bench_config.clone(), + funded_payer: funded_payer.clone(), + size_tx, + }; + if let Some(postgres_session) = postgres_session.as_ref() { let _dbstatus = upsert_benchrun_status( postgres_session, @@ -103,23 +119,29 @@ async fn main() { .await; } - let metric = bench::service_adapter::bench_servicerunner( - &bench_config, - tenant_config.rpc_addr.clone(), - funded_payer.insecure_clone(), - size_tx, - ) - .await; + let metric = bench_impl.run_bench().await; + // let metric = bench::service_adapter1::bench_servicerunner( + // &bench_config, + // tenant_config.rpc_addr.clone(), + // funded_payer.insecure_clone(), + // size_tx, + // ) + // .await; if let Some(postgres_session) = postgres_session.as_ref() { - let _dbstatus = save_metrics_to_postgres( - postgres_session, - &tenant_config, - &bench_config, - &metric, - benchrun_at, - ) - .await; + // let _dbstatus = save_metrics_to_postgres( + // postgres_session, + // &tenant_config, + // &bench_config, + // &metric, + // benchrun_at, + // ) + // .await; + let save_result = bench_impl.try_save_results_postgres(&metric, postgres_session).await; + if let Err(err) = save_result { + warn!("Failed to save metrics to postgres (err {:?}) - continue", err); + } + } publish_metrics_on_prometheus(&tenant_config, &bench_config, &metric).await; @@ -147,3 +169,36 @@ async fn main() { join_all(jh_tenant_task).await; } + +// R: result +#[async_trait] +trait BenchRunner { + async fn run_bench(&self) -> M; +} + +// R: result +#[async_trait] +trait BenchMetricsPostgresSaver { + async fn try_save_results_postgres(&self, metric: &M, postgres_session: &PostgresSessionCache) -> anyhow::Result<()>; +} + +struct BenchRunnerOldBenchImpl { + pub benchrun_at: SystemTime, + pub tenant_config: TenantConfig, + pub bench_config: BenchConfig, + pub funded_payer: Arc, + pub size_tx: TxSize, +} + +#[async_trait] +impl BenchRunner for BenchRunnerOldBenchImpl { + async fn run_bench(&self) -> Metric { + bench::service_adapter1::bench_servicerunner( + &self.bench_config, + self.tenant_config.rpc_addr.clone(), + self.funded_payer.insecure_clone(), + self.size_tx, + ) + .await + } +} diff --git a/benchrunner-service/src/postgres/metrics_dbstore.rs b/benchrunner-service/src/postgres/metrics_dbstore.rs index a5cb6cb3..3d65d8fc 100644 --- a/benchrunner-service/src/postgres/metrics_dbstore.rs +++ b/benchrunner-service/src/postgres/metrics_dbstore.rs @@ -1,10 +1,12 @@ use crate::args::TenantConfig; use crate::postgres::postgres_session_cache::PostgresSessionCache; use bench::metrics::Metric; -use bench::service_adapter::BenchConfig; +use bench::service_adapter1::BenchConfig; use log::warn; use postgres_types::ToSql; use std::time::SystemTime; +use async_trait::async_trait; +use crate::{BenchMetricsPostgresSaver, BenchRunner, BenchRunnerOldBenchImpl}; #[allow(clippy::upper_case_acronyms)] pub enum BenchRunStatus { @@ -57,29 +59,27 @@ pub async fn upsert_benchrun_status( Ok(()) } -pub async fn save_metrics_to_postgres( - postgres_session: &PostgresSessionCache, - tenant_config: &TenantConfig, - bench_config: &BenchConfig, - metric: &Metric, - benchrun_at: SystemTime, -) -> anyhow::Result<()> { - let metricjson = serde_json::to_value(metric).unwrap(); - let values: &[&(dyn ToSql + Sync)] = &[ - &tenant_config.tenant_id, - &benchrun_at, - &(bench_config.cu_price_micro_lamports as i64), - &(metric.txs_sent as i64), - &(metric.txs_confirmed as i64), - &(metric.txs_un_confirmed as i64), - &(metric.average_confirmation_time_ms as f32), - &metricjson, - ]; - let write_result = postgres_session - .get_session() - .await? - .execute( - r#" + + +#[async_trait] +impl BenchMetricsPostgresSaver for BenchRunnerOldBenchImpl { + async fn try_save_results_postgres(&self, metric: &Metric, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> { + let metricjson = serde_json::to_value(metric).unwrap(); + let values: &[&(dyn ToSql + Sync)] = &[ + &self.tenant_config.tenant_id, + &self.benchrun_at, + &(self.bench_config.cu_price_micro_lamports as i64), + &(metric.txs_sent as i64), + &(metric.txs_confirmed as i64), + &(metric.txs_un_confirmed as i64), + &(metric.average_confirmation_time_ms as f32), + &metricjson, + ]; + postgres_session + .get_session() + .await? + .execute( + r#" INSERT INTO benchrunner.bench_metrics ( tenant, @@ -92,13 +92,13 @@ pub async fn save_metrics_to_postgres( ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) "#, - values, - ) - .await; + values, + ) + .await?; - if let Err(err) = write_result { - warn!("Failed to insert metrics (err {:?}) - continue", err); - } - Ok(()) + Ok(()) + } } + + diff --git a/benchrunner-service/src/prometheus/metrics_prometheus.rs b/benchrunner-service/src/prometheus/metrics_prometheus.rs index 8d250659..ea88b89b 100644 --- a/benchrunner-service/src/prometheus/metrics_prometheus.rs +++ b/benchrunner-service/src/prometheus/metrics_prometheus.rs @@ -1,7 +1,7 @@ use bench::metrics::Metric; use crate::args::TenantConfig; -use bench::service_adapter::BenchConfig; +use bench::service_adapter1::BenchConfig; use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; // https://github.com/blockworks-foundation/lite-rpc/blob/production/bench/src/metrics.rs From fd3fe80336ced68671a3d05ffddef00658d72465 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 27 Mar 2024 18:00:10 +0100 Subject: [PATCH 2/8] introduce traits --- benchrunner-service/src/main.rs | 77 +++++++++++++------ .../src/postgres/metrics_dbstore.rs | 11 ++- 2 files changed, 61 insertions(+), 27 deletions(-) diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index 557ec897..768c7d5d 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -100,12 +100,22 @@ async fn main() { ); let benchrun_at = SystemTime::now(); - let bench_impl = BenchRunnerOldBenchImpl { - benchrun_at, - tenant_config: tenant_config.clone(), - bench_config: bench_config.clone(), - funded_payer: funded_payer.clone(), - size_tx, + let bench_impl: Box> = if true { + Box::new(BenchRunnerBench1Impl { + benchrun_at, + tenant_config: tenant_config.clone(), + bench_config: bench_config.clone(), + funded_payer: funded_payer.clone(), + size_tx, + }) + } else { + Box::new(BenchRunnerConfirmationRateImpl { + benchrun_at, + tenant_config: tenant_config.clone(), + bench_config: bench_config.clone(), + funded_payer: funded_payer.clone(), + size_tx, + }) }; if let Some(postgres_session) = postgres_session.as_ref() { @@ -120,23 +130,8 @@ async fn main() { } let metric = bench_impl.run_bench().await; - // let metric = bench::service_adapter1::bench_servicerunner( - // &bench_config, - // tenant_config.rpc_addr.clone(), - // funded_payer.insecure_clone(), - // size_tx, - // ) - // .await; if let Some(postgres_session) = postgres_session.as_ref() { - // let _dbstatus = save_metrics_to_postgres( - // postgres_session, - // &tenant_config, - // &bench_config, - // &metric, - // benchrun_at, - // ) - // .await; let save_result = bench_impl.try_save_results_postgres(&metric, postgres_session).await; if let Err(err) = save_result { warn!("Failed to save metrics to postgres (err {:?}) - continue", err); @@ -172,17 +167,19 @@ async fn main() { // R: result #[async_trait] -trait BenchRunner { +trait BenchRunner: Send + Sync + 'static { async fn run_bench(&self) -> M; } +trait BenchTrait: BenchRunner + BenchMetricsPostgresSaver {} + // R: result #[async_trait] -trait BenchMetricsPostgresSaver { +trait BenchMetricsPostgresSaver: Send + Sync + 'static { async fn try_save_results_postgres(&self, metric: &M, postgres_session: &PostgresSessionCache) -> anyhow::Result<()>; } -struct BenchRunnerOldBenchImpl { +struct BenchRunnerBench1Impl { pub benchrun_at: SystemTime, pub tenant_config: TenantConfig, pub bench_config: BenchConfig, @@ -190,8 +187,11 @@ struct BenchRunnerOldBenchImpl { pub size_tx: TxSize, } +impl BenchTrait for BenchRunnerBench1Impl {} + + #[async_trait] -impl BenchRunner for BenchRunnerOldBenchImpl { +impl BenchRunner for BenchRunnerBench1Impl { async fn run_bench(&self) -> Metric { bench::service_adapter1::bench_servicerunner( &self.bench_config, @@ -202,3 +202,30 @@ impl BenchRunner for BenchRunnerOldBenchImpl { .await } } + + +impl BenchTrait for BenchRunnerConfirmationRateImpl {} + +struct BenchRunnerConfirmationRateImpl { + pub benchrun_at: SystemTime, + pub tenant_config: TenantConfig, + pub bench_config: BenchConfig, + pub funded_payer: Arc, + pub size_tx: TxSize, +} + +#[async_trait] +impl BenchRunner for BenchRunnerConfirmationRateImpl { + async fn run_bench(&self) -> Metric { + bench::service_adapter_new::benchnew_confirmation_rate_servicerunner( + &self.bench_config, + self.tenant_config.rpc_addr.clone(), + self.funded_payer.insecure_clone(), + self.size_tx, + ) + .await + } +} + + + diff --git a/benchrunner-service/src/postgres/metrics_dbstore.rs b/benchrunner-service/src/postgres/metrics_dbstore.rs index 3d65d8fc..9df4a869 100644 --- a/benchrunner-service/src/postgres/metrics_dbstore.rs +++ b/benchrunner-service/src/postgres/metrics_dbstore.rs @@ -6,7 +6,7 @@ use log::warn; use postgres_types::ToSql; use std::time::SystemTime; use async_trait::async_trait; -use crate::{BenchMetricsPostgresSaver, BenchRunner, BenchRunnerOldBenchImpl}; +use crate::{BenchMetricsPostgresSaver, BenchRunner, BenchRunnerBench1Impl, BenchRunnerConfirmationRateImpl}; #[allow(clippy::upper_case_acronyms)] pub enum BenchRunStatus { @@ -62,7 +62,7 @@ pub async fn upsert_benchrun_status( #[async_trait] -impl BenchMetricsPostgresSaver for BenchRunnerOldBenchImpl { +impl BenchMetricsPostgresSaver for BenchRunnerBench1Impl { async fn try_save_results_postgres(&self, metric: &Metric, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> { let metricjson = serde_json::to_value(metric).unwrap(); let values: &[&(dyn ToSql + Sync)] = &[ @@ -102,3 +102,10 @@ impl BenchMetricsPostgresSaver for BenchRunnerOldBenchImpl { } +#[async_trait] +impl BenchMetricsPostgresSaver for BenchRunnerConfirmationRateImpl { + async fn try_save_results_postgres(&self, metric: &Metric, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> { + todo!(); + } +} + From 2a87e2f8d7ef559d0aa6a5024803f608ccedd666 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 27 Mar 2024 18:13:11 +0100 Subject: [PATCH 3/8] permutate config + impl --- benchrunner-service/src/main.rs | 92 ++++++++++++++++++++++++++------- 1 file changed, 72 insertions(+), 20 deletions(-) diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index 768c7d5d..c810be88 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -93,30 +93,59 @@ async fn main() { let jh_runner = tokio::spawn(async move { let mut interval = tokio::time::interval(bench_interval); for run_count in 1.. { - let bench_config = bench_configs[run_count % bench_configs.len()].clone(); + const NUM_BENCH_IMPLS: usize = 2; + + let benchrun_at = SystemTime::now(); + + let factors = factorize(run_count, &[bench_configs.len(), NUM_BENCH_IMPLS]); + + let bench_config = bench_configs[factors[0]].clone(); + + let bench_impl: Box> = match factors[1] { + 0 => { + Box::new(BenchRunnerBench1Impl { + benchrun_at, + tenant_config: tenant_config.clone(), + bench_config: bench_config.clone(), + funded_payer: funded_payer.clone(), + size_tx, + }) + } + 1 => { + Box::new(BenchRunnerConfirmationRateImpl { + benchrun_at, + tenant_config: tenant_config.clone(), + bench_config: bench_config.clone(), + funded_payer: funded_payer.clone(), + size_tx, + }) + } + _ => unreachable!(), + }; + debug!( "Invoke bench execution (#{}) on tenant <{}> using {}", - run_count, tenant_id, bench_config + run_count, tenant_id, &bench_config ); - let benchrun_at = SystemTime::now(); - let bench_impl: Box> = if true { - Box::new(BenchRunnerBench1Impl { - benchrun_at, - tenant_config: tenant_config.clone(), - bench_config: bench_config.clone(), - funded_payer: funded_payer.clone(), - size_tx, - }) - } else { - Box::new(BenchRunnerConfirmationRateImpl { - benchrun_at, - tenant_config: tenant_config.clone(), - bench_config: bench_config.clone(), - funded_payer: funded_payer.clone(), - size_tx, - }) - }; + + // let bench_impl: Box> = if true { + // Box::new(BenchRunnerBench1Impl { + // benchrun_at, + // tenant_config: tenant_config.clone(), + // bench_config: bench_config.clone(), + // funded_payer: funded_payer.clone(), + // size_tx, + // }) + // } else { + // Box::new(BenchRunnerConfirmationRateImpl { + // benchrun_at, + // tenant_config: tenant_config.clone(), + // bench_config: bench_config.clone(), + // funded_payer: funded_payer.clone(), + // size_tx, + // }) + // }; if let Some(postgres_session) = postgres_session.as_ref() { let _dbstatus = upsert_benchrun_status( @@ -165,6 +194,29 @@ async fn main() { join_all(jh_tenant_task).await; } + + +// dimensions: least-significant first +fn factorize(i: usize, dimensions: &[usize]) -> Vec { + let mut i = i; + let mut result = Vec::new(); + for &dim in dimensions { + result.push(i % dim); + i /= dim; + } + result +} + +#[test] +fn test_factorize() { + assert_eq!(factorize(0, &[2, 3]), vec![0, 0]); + assert_eq!(factorize(1, &[2, 3]), vec![1, 0]); + assert_eq!(factorize(2, &[2, 3]), vec![0, 1]); + assert_eq!(factorize(3, &[2, 3]), vec![1, 1]); + assert_eq!(factorize(4, &[2, 3]), vec![0, 2]); + assert_eq!(factorize(5, &[2, 3]), vec![1, 2]); +} + // R: result #[async_trait] trait BenchRunner: Send + Sync + 'static { From 4a6a59487325eee7a42378fb00fc3bc9e45ddcb6 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 27 Mar 2024 18:40:04 +0100 Subject: [PATCH 4/8] split metric struct --- bench/src/benches/confirmation_rate.rs | 12 ++-- bench/src/service_adapter_new.rs | 28 +++++--- benchrunner-service/src/main.rs | 69 ++++++++----------- .../src/postgres/metrics_dbstore.rs | 12 ++-- 4 files changed, 61 insertions(+), 60 deletions(-) diff --git a/bench/src/benches/confirmation_rate.rs b/bench/src/benches/confirmation_rate.rs index 98263feb..ee4be7af 100644 --- a/bench/src/benches/confirmation_rate.rs +++ b/bench/src/benches/confirmation_rate.rs @@ -12,8 +12,8 @@ use crate::benches::rpc_interface::{ use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer}; -#[derive(Debug, serde::Serialize)] -pub struct RpcStat { +#[derive(Clone, Copy, Debug, Default, serde::Serialize)] +pub struct Metric { tx_sent: u64, tx_confirmed: u64, // in ms @@ -76,7 +76,7 @@ pub async fn send_bulk_txs_and_wait( num_txs: usize, tx_params: &BenchmarkTransactionParams, max_timeout: Duration, -) -> anyhow::Result { +) -> anyhow::Result { trace!("Get latest blockhash and generate transactions"); let hash = rpc.get_latest_blockhash().await.map_err(|err| { log::error!("Error get latest blockhash : {err:?}"); @@ -148,7 +148,7 @@ pub async fn send_bulk_txs_and_wait( 0.0 }; - Ok(RpcStat { + Ok(Metric { tx_sent, tx_send_errors, tx_confirmed, @@ -158,10 +158,10 @@ pub async fn send_bulk_txs_and_wait( }) } -fn calc_stats_avg(stats: &[RpcStat]) -> RpcStat { +fn calc_stats_avg(stats: &[Metric]) -> Metric { let len = stats.len(); - let mut avg = RpcStat { + let mut avg = Metric { tx_sent: 0, tx_send_errors: 0, tx_confirmed: 0, diff --git a/bench/src/service_adapter_new.rs b/bench/src/service_adapter_new.rs index eb5a841d..58caacd1 100644 --- a/bench/src/service_adapter_new.rs +++ b/bench/src/service_adapter_new.rs @@ -1,6 +1,13 @@ +use std::sync::Arc; +use std::time::Duration; +use log::error; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::signature::Keypair; use tokio::time::Instant; -use crate::metrics::Metric; +use crate::benches::confirmation_rate; +use crate::benches::confirmation_rate::{send_bulk_txs_and_wait}; +use crate::BenchmarkTransactionParams; use crate::service_adapter1::BenchConfig; use crate::tx_size::TxSize; @@ -8,11 +15,16 @@ pub async fn benchnew_confirmation_rate_servicerunner( bench_config: &BenchConfig, rpc_addr: String, funded_payer: Keypair, - size_tx: TxSize, -) -> Metric { - let started_at = Instant::now(); - - - - todo!() +) -> confirmation_rate::Metric { + let rpc = Arc::new(RpcClient::new(rpc_addr)); + let tx_params = BenchmarkTransactionParams { + tx_size: TxSize::Small, + cu_price_micro_lamports: bench_config.cu_price_micro_lamports, + }; + let max_timeout = Duration::from_secs(60); + let result = send_bulk_txs_and_wait(&rpc, &funded_payer, 1, &tx_params, max_timeout).await; + result.unwrap_or_else(|err| { + error!("Failed to send bulk txs and wait: {}", err); + confirmation_rate::Metric::default() + }) } diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index c810be88..f64b5f0b 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -24,7 +24,9 @@ use std::time::{Duration, SystemTime}; use async_trait::async_trait; use postgres_types::ToSql; use solana_sdk::signature::Keypair; -use bench::metrics::Metric; +use tokio::sync::OnceCell; +use bench::metrics; +use bench::benches::confirmation_rate; use bench::tx_size::TxSize; #[tokio::main] @@ -97,11 +99,11 @@ async fn main() { let benchrun_at = SystemTime::now(); - let factors = factorize(run_count, &[bench_configs.len(), NUM_BENCH_IMPLS]); + let permutation = factorize(run_count, &[bench_configs.len(), NUM_BENCH_IMPLS]); - let bench_config = bench_configs[factors[0]].clone(); + let bench_config = bench_configs[permutation[0]].clone(); - let bench_impl: Box> = match factors[1] { + let bench_impl: Box = match permutation[1] { 0 => { Box::new(BenchRunnerBench1Impl { benchrun_at, @@ -109,6 +111,7 @@ async fn main() { bench_config: bench_config.clone(), funded_payer: funded_payer.clone(), size_tx, + metric: OnceCell::new(), }) } 1 => { @@ -118,6 +121,7 @@ async fn main() { bench_config: bench_config.clone(), funded_payer: funded_payer.clone(), size_tx, + metric: OnceCell::new(), }) } _ => unreachable!(), @@ -128,25 +132,6 @@ async fn main() { run_count, tenant_id, &bench_config ); - - // let bench_impl: Box> = if true { - // Box::new(BenchRunnerBench1Impl { - // benchrun_at, - // tenant_config: tenant_config.clone(), - // bench_config: bench_config.clone(), - // funded_payer: funded_payer.clone(), - // size_tx, - // }) - // } else { - // Box::new(BenchRunnerConfirmationRateImpl { - // benchrun_at, - // tenant_config: tenant_config.clone(), - // bench_config: bench_config.clone(), - // funded_payer: funded_payer.clone(), - // size_tx, - // }) - // }; - if let Some(postgres_session) = postgres_session.as_ref() { let _dbstatus = upsert_benchrun_status( postgres_session, @@ -161,14 +146,14 @@ async fn main() { let metric = bench_impl.run_bench().await; if let Some(postgres_session) = postgres_session.as_ref() { - let save_result = bench_impl.try_save_results_postgres(&metric, postgres_session).await; + let save_result = bench_impl.try_save_results_postgres(postgres_session).await; if let Err(err) = save_result { warn!("Failed to save metrics to postgres (err {:?}) - continue", err); } } - publish_metrics_on_prometheus(&tenant_config, &bench_config, &metric).await; + // publish_metrics_on_prometheus(&tenant_config, &bench_config).await; if let Some(postgres_session) = postgres_session.as_ref() { let _dbstatus = upsert_benchrun_status( @@ -219,16 +204,16 @@ fn test_factorize() { // R: result #[async_trait] -trait BenchRunner: Send + Sync + 'static { - async fn run_bench(&self) -> M; +trait BenchRunner: Send + Sync + 'static { + async fn run_bench(&self); } -trait BenchTrait: BenchRunner + BenchMetricsPostgresSaver {} +trait BenchTrait: BenchRunner + BenchMetricsPostgresSaver {} // R: result #[async_trait] -trait BenchMetricsPostgresSaver: Send + Sync + 'static { - async fn try_save_results_postgres(&self, metric: &M, postgres_session: &PostgresSessionCache) -> anyhow::Result<()>; +trait BenchMetricsPostgresSaver: Send + Sync + 'static { + async fn try_save_results_postgres(&self, postgres_session: &PostgresSessionCache) -> anyhow::Result<()>; } struct BenchRunnerBench1Impl { @@ -237,26 +222,27 @@ struct BenchRunnerBench1Impl { pub bench_config: BenchConfig, pub funded_payer: Arc, pub size_tx: TxSize, + pub metric: OnceCell, } -impl BenchTrait for BenchRunnerBench1Impl {} +impl BenchTrait for BenchRunnerBench1Impl {} #[async_trait] -impl BenchRunner for BenchRunnerBench1Impl { - async fn run_bench(&self) -> Metric { - bench::service_adapter1::bench_servicerunner( +impl BenchRunner for BenchRunnerBench1Impl { + async fn run_bench(&self) { + let metric = bench::service_adapter1::bench_servicerunner( &self.bench_config, self.tenant_config.rpc_addr.clone(), self.funded_payer.insecure_clone(), self.size_tx, ) - .await + .await; } } -impl BenchTrait for BenchRunnerConfirmationRateImpl {} +impl BenchTrait for BenchRunnerConfirmationRateImpl {} struct BenchRunnerConfirmationRateImpl { pub benchrun_at: SystemTime, @@ -264,18 +250,19 @@ struct BenchRunnerConfirmationRateImpl { pub bench_config: BenchConfig, pub funded_payer: Arc, pub size_tx: TxSize, + pub metric: OnceCell, } #[async_trait] -impl BenchRunner for BenchRunnerConfirmationRateImpl { - async fn run_bench(&self) -> Metric { - bench::service_adapter_new::benchnew_confirmation_rate_servicerunner( +impl BenchRunner for BenchRunnerConfirmationRateImpl { + async fn run_bench(&self) { + let metric = bench::service_adapter_new::benchnew_confirmation_rate_servicerunner( &self.bench_config, self.tenant_config.rpc_addr.clone(), self.funded_payer.insecure_clone(), - self.size_tx, ) - .await + .await; + self.metric.set(metric).unwrap(); } } diff --git a/benchrunner-service/src/postgres/metrics_dbstore.rs b/benchrunner-service/src/postgres/metrics_dbstore.rs index 9df4a869..da91567f 100644 --- a/benchrunner-service/src/postgres/metrics_dbstore.rs +++ b/benchrunner-service/src/postgres/metrics_dbstore.rs @@ -1,6 +1,7 @@ use crate::args::TenantConfig; use crate::postgres::postgres_session_cache::PostgresSessionCache; -use bench::metrics::Metric; +use bench::metrics; +use bench::benches::confirmation_rate; use bench::service_adapter1::BenchConfig; use log::warn; use postgres_types::ToSql; @@ -62,8 +63,9 @@ pub async fn upsert_benchrun_status( #[async_trait] -impl BenchMetricsPostgresSaver for BenchRunnerBench1Impl { - async fn try_save_results_postgres(&self, metric: &Metric, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> { +impl BenchMetricsPostgresSaver for BenchRunnerBench1Impl { + async fn try_save_results_postgres(&self, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> { + let metric = self.metric.get().expect("metric not set"); let metricjson = serde_json::to_value(metric).unwrap(); let values: &[&(dyn ToSql + Sync)] = &[ &self.tenant_config.tenant_id, @@ -103,8 +105,8 @@ impl BenchMetricsPostgresSaver for BenchRunnerBench1Impl { #[async_trait] -impl BenchMetricsPostgresSaver for BenchRunnerConfirmationRateImpl { - async fn try_save_results_postgres(&self, metric: &Metric, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> { +impl BenchMetricsPostgresSaver for BenchRunnerConfirmationRateImpl { + async fn try_save_results_postgres(&self, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> { todo!(); } } From 8aca443d5643445df813a453fcc779254ba3c212 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 27 Mar 2024 18:54:14 +0100 Subject: [PATCH 5/8] wire up confirmation_rate + bench1 --- bench/src/benches/confirmation_rate.rs | 44 +++++++++---------- bench/src/service_adapter_new.rs | 2 +- benchrunner-service/src/main.rs | 11 ++--- .../src/postgres/metrics_dbstore.rs | 41 ++++++++++++++++- migrations/create_benchrunner.sql | 37 +++++++++++++--- 5 files changed, 98 insertions(+), 37 deletions(-) diff --git a/bench/src/benches/confirmation_rate.rs b/bench/src/benches/confirmation_rate.rs index ee4be7af..e24c81e0 100644 --- a/bench/src/benches/confirmation_rate.rs +++ b/bench/src/benches/confirmation_rate.rs @@ -14,14 +14,14 @@ use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer}; #[derive(Clone, Copy, Debug, Default, serde::Serialize)] pub struct Metric { - tx_sent: u64, - tx_confirmed: u64, + pub txs_sent: u64, + pub txs_confirmed: u64, // in ms - average_confirmation_time: f32, + pub average_confirmation_time: f32, // in slots - average_slot_confirmation_time: f32, - tx_send_errors: u64, - tx_unconfirmed: u64, + pub average_slot_confirmation_time: f32, + pub txs_send_errors: u64, + pub txs_un_confirmed: u64, } /// TC2 send multiple runs of num_txs, measure the confirmation rate @@ -149,10 +149,10 @@ pub async fn send_bulk_txs_and_wait( }; Ok(Metric { - tx_sent, - tx_send_errors, - tx_confirmed, - tx_unconfirmed, + txs_sent: tx_sent, + txs_send_errors: tx_send_errors, + txs_confirmed: tx_confirmed, + txs_un_confirmed: tx_unconfirmed, average_confirmation_time: average_confirmation_time_ms, average_slot_confirmation_time, }) @@ -162,27 +162,27 @@ fn calc_stats_avg(stats: &[Metric]) -> Metric { let len = stats.len(); let mut avg = Metric { - tx_sent: 0, - tx_send_errors: 0, - tx_confirmed: 0, - tx_unconfirmed: 0, + txs_sent: 0, + txs_send_errors: 0, + txs_confirmed: 0, + txs_un_confirmed: 0, average_confirmation_time: 0.0, average_slot_confirmation_time: 0.0, }; for stat in stats { - avg.tx_sent += stat.tx_sent; - avg.tx_send_errors += stat.tx_send_errors; - avg.tx_confirmed += stat.tx_confirmed; - avg.tx_unconfirmed += stat.tx_unconfirmed; + avg.txs_sent += stat.txs_sent; + avg.txs_send_errors += stat.txs_send_errors; + avg.txs_confirmed += stat.txs_confirmed; + avg.txs_un_confirmed += stat.txs_un_confirmed; avg.average_confirmation_time += stat.average_confirmation_time; avg.average_slot_confirmation_time += stat.average_slot_confirmation_time; } - avg.tx_sent /= len as u64; - avg.tx_send_errors /= len as u64; - avg.tx_confirmed /= len as u64; - avg.tx_unconfirmed /= len as u64; + avg.txs_sent /= len as u64; + avg.txs_send_errors /= len as u64; + avg.txs_confirmed /= len as u64; + avg.txs_un_confirmed /= len as u64; avg.average_confirmation_time /= len as f32; avg.average_slot_confirmation_time /= len as f32; diff --git a/bench/src/service_adapter_new.rs b/bench/src/service_adapter_new.rs index 58caacd1..e92276e4 100644 --- a/bench/src/service_adapter_new.rs +++ b/bench/src/service_adapter_new.rs @@ -22,7 +22,7 @@ pub async fn benchnew_confirmation_rate_servicerunner( cu_price_micro_lamports: bench_config.cu_price_micro_lamports, }; let max_timeout = Duration::from_secs(60); - let result = send_bulk_txs_and_wait(&rpc, &funded_payer, 1, &tx_params, max_timeout).await; + let result = send_bulk_txs_and_wait(&rpc, &funded_payer, bench_config.tx_count, &tx_params, max_timeout).await; result.unwrap_or_else(|err| { error!("Failed to send bulk txs and wait: {}", err); confirmation_rate::Metric::default() diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index f64b5f0b..7eeccec3 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -99,13 +99,13 @@ async fn main() { let benchrun_at = SystemTime::now(); - let permutation = factorize(run_count, &[bench_configs.len(), NUM_BENCH_IMPLS]); + let permutation = factorize(run_count, &[NUM_BENCH_IMPLS, bench_configs.len()]); - let bench_config = bench_configs[permutation[0]].clone(); + let bench_config = bench_configs[permutation[1]].clone(); - let bench_impl: Box = match permutation[1] { + let bench_impl: Box = match permutation[0] { 0 => { - Box::new(BenchRunnerBench1Impl { + Box::new(BenchRunnerConfirmationRateImpl { benchrun_at, tenant_config: tenant_config.clone(), bench_config: bench_config.clone(), @@ -115,7 +115,7 @@ async fn main() { }) } 1 => { - Box::new(BenchRunnerConfirmationRateImpl { + Box::new(BenchRunnerBench1Impl { benchrun_at, tenant_config: tenant_config.clone(), bench_config: bench_config.clone(), @@ -238,6 +238,7 @@ impl BenchRunner for BenchRunnerBench1Impl { self.size_tx, ) .await; + self.metric.set(metric).unwrap(); } } diff --git a/benchrunner-service/src/postgres/metrics_dbstore.rs b/benchrunner-service/src/postgres/metrics_dbstore.rs index da91567f..54920d89 100644 --- a/benchrunner-service/src/postgres/metrics_dbstore.rs +++ b/benchrunner-service/src/postgres/metrics_dbstore.rs @@ -83,7 +83,7 @@ impl BenchMetricsPostgresSaver for BenchRunnerBench1Impl { .execute( r#" INSERT INTO - benchrunner.bench_metrics ( + benchrunner.bench_metrics_bench1 ( tenant, ts, prio_fees, @@ -107,7 +107,44 @@ impl BenchMetricsPostgresSaver for BenchRunnerBench1Impl { #[async_trait] impl BenchMetricsPostgresSaver for BenchRunnerConfirmationRateImpl { async fn try_save_results_postgres(&self, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> { - todo!(); + let metric = self.metric.get().expect("metric not set"); + let metricjson = serde_json::to_value(metric).unwrap(); + let values: &[&(dyn ToSql + Sync)] = &[ + &self.tenant_config.tenant_id, + &self.benchrun_at, + &(self.bench_config.cu_price_micro_lamports as i64), + &(metric.txs_sent as i64), + &(metric.txs_confirmed as i64), + &(metric.txs_un_confirmed as i64), + &(metric.average_confirmation_time as f32), + &(metric.average_slot_confirmation_time as f32), + &metricjson, + ]; + postgres_session + .get_session() + .await? + .execute( + r#" + INSERT INTO + benchrunner.bench_metrics_confirmation_rate ( + tenant, + ts, + prio_fees, + txs_sent, + txs_confirmed, + txs_un_confirmed, + average_confirmation_time_ms, + average_slot_confirmation_time_ms, + metric_json + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + "#, + values, + ) + .await?; + + + Ok(()) } } diff --git a/migrations/create_benchrunner.sql b/migrations/create_benchrunner.sql index 77d32450..240b3a5b 100644 --- a/migrations/create_benchrunner.sql +++ b/migrations/create_benchrunner.sql @@ -1,7 +1,16 @@ CREATE SCHEMA benchrunner; -CREATE TABLE benchrunner.bench_metrics ( + +CREATE TABLE benchrunner.bench_runs ( + tenant text NOT NULL, + ts timestamp NOT NULL, + status text NOT NULL, + PRIMARY KEY (tenant, ts) +); + + +CREATE TABLE benchrunner.bench_metrics_bench1 ( tenant text NOT NULL, ts timestamp NOT NULL, prio_fees int8 NOT NULL, @@ -13,14 +22,28 @@ CREATE TABLE benchrunner.bench_metrics ( PRIMARY KEY (tenant, ts) ); -CREATE TABLE benchrunner.bench_runs ( - tenant text NOT NULL, - ts timestamp NOT NULL, - status text NOT NULL, - PRIMARY KEY (tenant, ts) + +CREATE TABLE benchrunner.bench_metrics_confirmation_rate ( + tenant text NOT NULL, + ts timestamp NOT NULL, + prio_fees int8 NOT NULL, + txs_sent int8 NOT NULL, + txs_confirmed int8 NOT NULL, + txs_un_confirmed int8 NOT NULL, + average_confirmation_time_ms real NOT NULL, + average_slot_confirmation_time_ms real NOT NULL, + metric_json jsonb NOT NULL, + PRIMARY KEY (tenant, ts) ); + GRANT USAGE ON SCHEMA benchrunner TO r_benchrunner; -GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA benchrunner TO r_benchrunner; GRANT USAGE ON SCHEMA benchrunner TO ro_benchrunner; + +GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA benchrunner TO r_benchrunner; +ALTER DEFAULT PRIVILEGES IN SCHEMA benchrunner GRANT SELECT, INSERT, UPDATE ON TABLES TO r_benchrunner; + GRANT SELECT ON ALL TABLES IN SCHEMA benchrunner TO ro_benchrunner; +ALTER DEFAULT PRIVILEGES IN SCHEMA benchrunner GRANT SELECT ON TABLES TO ro_benchrunner; + +ALTER TABLE benchrunner.bench_metrics RENAME TO bench_metrics_bench1; From f3657d7db131c6104d4bd24bb87965a6ba7f3d03 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 27 Mar 2024 18:57:51 +0100 Subject: [PATCH 6/8] wire up size_tx --- bench/src/service_adapter1.rs | 4 ++-- bench/src/service_adapter_new.rs | 4 +--- benchrunner-service/src/main.rs | 2 +- benchrunner-service/src/prometheus/metrics_prometheus.rs | 1 + 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/bench/src/service_adapter1.rs b/bench/src/service_adapter1.rs index a1f310bd..28b570f9 100644 --- a/bench/src/service_adapter1.rs +++ b/bench/src/service_adapter1.rs @@ -20,6 +20,7 @@ use tokio::time::Instant; #[derive(Debug, Clone)] pub struct BenchConfig { pub tx_count: usize, + pub tx_size: TxSize, pub cu_price_micro_lamports: u64, } @@ -33,11 +34,10 @@ pub async fn bench_servicerunner( bench_config: &BenchConfig, rpc_addr: String, funded_payer: Keypair, - size_tx: TxSize, ) -> Metric { let started_at = Instant::now(); - let transaction_size = match size_tx { + let transaction_size = match bench_config.tx_size { TxSize::Small => TransactionSize::Small, TxSize::Large => TransactionSize::Large, }; diff --git a/bench/src/service_adapter_new.rs b/bench/src/service_adapter_new.rs index e92276e4..2c97bdab 100644 --- a/bench/src/service_adapter_new.rs +++ b/bench/src/service_adapter_new.rs @@ -2,9 +2,7 @@ use std::sync::Arc; use std::time::Duration; use log::error; use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::signature::Keypair; -use tokio::time::Instant; use crate::benches::confirmation_rate; use crate::benches::confirmation_rate::{send_bulk_txs_and_wait}; use crate::BenchmarkTransactionParams; @@ -18,7 +16,7 @@ pub async fn benchnew_confirmation_rate_servicerunner( ) -> confirmation_rate::Metric { let rpc = Arc::new(RpcClient::new(rpc_addr)); let tx_params = BenchmarkTransactionParams { - tx_size: TxSize::Small, + tx_size: bench_config.tx_size, cu_price_micro_lamports: bench_config.cu_price_micro_lamports, }; let max_timeout = Duration::from_secs(60); diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index 7eeccec3..9cb64152 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -80,6 +80,7 @@ async fn main() { .iter() .map(|prio_fees| BenchConfig { tx_count, + tx_size: size_tx, cu_price_micro_lamports: *prio_fees, }) .collect_vec(); @@ -235,7 +236,6 @@ impl BenchRunner for BenchRunnerBench1Impl { &self.bench_config, self.tenant_config.rpc_addr.clone(), self.funded_payer.insecure_clone(), - self.size_tx, ) .await; self.metric.set(metric).unwrap(); diff --git a/benchrunner-service/src/prometheus/metrics_prometheus.rs b/benchrunner-service/src/prometheus/metrics_prometheus.rs index ea88b89b..5c3fb21a 100644 --- a/benchrunner-service/src/prometheus/metrics_prometheus.rs +++ b/benchrunner-service/src/prometheus/metrics_prometheus.rs @@ -14,6 +14,7 @@ lazy_static::lazy_static! { // TODO add more } +// TODO implement pub async fn publish_metrics_on_prometheus( tenant_config: &TenantConfig, _bench_config: &BenchConfig, From 1f502a3a57b0a91d114f854c504cc5866184925c Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 27 Mar 2024 19:00:35 +0100 Subject: [PATCH 7/8] cleanup --- bench/src/service_adapter_new.rs | 22 ++++-- benchrunner-service/src/main.rs | 73 ++++++++----------- .../src/postgres/metrics_dbstore.rs | 26 +++---- .../src/prometheus/metrics_prometheus.rs | 2 +- 4 files changed, 56 insertions(+), 67 deletions(-) diff --git a/bench/src/service_adapter_new.rs b/bench/src/service_adapter_new.rs index 2c97bdab..06d71d14 100644 --- a/bench/src/service_adapter_new.rs +++ b/bench/src/service_adapter_new.rs @@ -1,13 +1,12 @@ -use std::sync::Arc; -use std::time::Duration; +use crate::benches::confirmation_rate; +use crate::benches::confirmation_rate::send_bulk_txs_and_wait; +use crate::service_adapter1::BenchConfig; +use crate::BenchmarkTransactionParams; use log::error; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::Keypair; -use crate::benches::confirmation_rate; -use crate::benches::confirmation_rate::{send_bulk_txs_and_wait}; -use crate::BenchmarkTransactionParams; -use crate::service_adapter1::BenchConfig; -use crate::tx_size::TxSize; +use std::sync::Arc; +use std::time::Duration; pub async fn benchnew_confirmation_rate_servicerunner( bench_config: &BenchConfig, @@ -20,7 +19,14 @@ pub async fn benchnew_confirmation_rate_servicerunner( cu_price_micro_lamports: bench_config.cu_price_micro_lamports, }; let max_timeout = Duration::from_secs(60); - let result = send_bulk_txs_and_wait(&rpc, &funded_payer, bench_config.tx_count, &tx_params, max_timeout).await; + let result = send_bulk_txs_and_wait( + &rpc, + &funded_payer, + bench_config.tx_count, + &tx_params, + max_timeout, + ) + .await; result.unwrap_or_else(|err| { error!("Failed to send bulk txs and wait: {}", err); confirmation_rate::Metric::default() diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index 9cb64152..fea54323 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -5,29 +5,24 @@ mod prometheus; use crate::args::{get_funded_payer_from_env, read_tenant_configs, TenantConfig}; use crate::cli::Args; -use crate::postgres::metrics_dbstore::{ - upsert_benchrun_status, BenchRunStatus, -}; +use crate::postgres::metrics_dbstore::{upsert_benchrun_status, BenchRunStatus}; use crate::postgres::postgres_session::PostgresSessionConfig; use crate::postgres::postgres_session_cache::PostgresSessionCache; -use crate::prometheus::metrics_prometheus::publish_metrics_on_prometheus; use crate::prometheus::prometheus_sync::PrometheusSync; +use async_trait::async_trait; +use bench::benches::confirmation_rate; +use bench::metrics; use bench::service_adapter1::BenchConfig; use clap::Parser; use futures_util::future::join_all; use itertools::Itertools; use log::{debug, error, info, warn}; +use solana_sdk::signature::Keypair; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, SystemTime}; -use async_trait::async_trait; -use postgres_types::ToSql; -use solana_sdk::signature::Keypair; use tokio::sync::OnceCell; -use bench::metrics; -use bench::benches::confirmation_rate; -use bench::tx_size::TxSize; #[tokio::main] async fn main() { @@ -105,26 +100,20 @@ async fn main() { let bench_config = bench_configs[permutation[1]].clone(); let bench_impl: Box = match permutation[0] { - 0 => { - Box::new(BenchRunnerConfirmationRateImpl { - benchrun_at, - tenant_config: tenant_config.clone(), - bench_config: bench_config.clone(), - funded_payer: funded_payer.clone(), - size_tx, - metric: OnceCell::new(), - }) - } - 1 => { - Box::new(BenchRunnerBench1Impl { - benchrun_at, - tenant_config: tenant_config.clone(), - bench_config: bench_config.clone(), - funded_payer: funded_payer.clone(), - size_tx, - metric: OnceCell::new(), - }) - } + 0 => Box::new(BenchRunnerConfirmationRateImpl { + benchrun_at, + tenant_config: tenant_config.clone(), + bench_config: bench_config.clone(), + funded_payer: funded_payer.clone(), + metric: OnceCell::new(), + }), + 1 => Box::new(BenchRunnerBench1Impl { + benchrun_at, + tenant_config: tenant_config.clone(), + bench_config: bench_config.clone(), + funded_payer: funded_payer.clone(), + metric: OnceCell::new(), + }), _ => unreachable!(), }; @@ -144,14 +133,16 @@ async fn main() { .await; } - let metric = bench_impl.run_bench().await; + bench_impl.run_bench().await; if let Some(postgres_session) = postgres_session.as_ref() { let save_result = bench_impl.try_save_results_postgres(postgres_session).await; if let Err(err) = save_result { - warn!("Failed to save metrics to postgres (err {:?}) - continue", err); + warn!( + "Failed to save metrics to postgres (err {:?}) - continue", + err + ); } - } // publish_metrics_on_prometheus(&tenant_config, &bench_config).await; @@ -180,8 +171,6 @@ async fn main() { join_all(jh_tenant_task).await; } - - // dimensions: least-significant first fn factorize(i: usize, dimensions: &[usize]) -> Vec { let mut i = i; @@ -214,7 +203,10 @@ trait BenchTrait: BenchRunner + BenchMetricsPostgresSaver {} // R: result #[async_trait] trait BenchMetricsPostgresSaver: Send + Sync + 'static { - async fn try_save_results_postgres(&self, postgres_session: &PostgresSessionCache) -> anyhow::Result<()>; + async fn try_save_results_postgres( + &self, + postgres_session: &PostgresSessionCache, + ) -> anyhow::Result<()>; } struct BenchRunnerBench1Impl { @@ -222,13 +214,11 @@ struct BenchRunnerBench1Impl { pub tenant_config: TenantConfig, pub bench_config: BenchConfig, pub funded_payer: Arc, - pub size_tx: TxSize, pub metric: OnceCell, } impl BenchTrait for BenchRunnerBench1Impl {} - #[async_trait] impl BenchRunner for BenchRunnerBench1Impl { async fn run_bench(&self) { @@ -242,7 +232,6 @@ impl BenchRunner for BenchRunnerBench1Impl { } } - impl BenchTrait for BenchRunnerConfirmationRateImpl {} struct BenchRunnerConfirmationRateImpl { @@ -250,7 +239,6 @@ struct BenchRunnerConfirmationRateImpl { pub tenant_config: TenantConfig, pub bench_config: BenchConfig, pub funded_payer: Arc, - pub size_tx: TxSize, pub metric: OnceCell, } @@ -262,10 +250,7 @@ impl BenchRunner for BenchRunnerConfirmationRateImpl { self.tenant_config.rpc_addr.clone(), self.funded_payer.insecure_clone(), ) - .await; + .await; self.metric.set(metric).unwrap(); } } - - - diff --git a/benchrunner-service/src/postgres/metrics_dbstore.rs b/benchrunner-service/src/postgres/metrics_dbstore.rs index 54920d89..55054ccf 100644 --- a/benchrunner-service/src/postgres/metrics_dbstore.rs +++ b/benchrunner-service/src/postgres/metrics_dbstore.rs @@ -1,13 +1,11 @@ use crate::args::TenantConfig; use crate::postgres::postgres_session_cache::PostgresSessionCache; -use bench::metrics; -use bench::benches::confirmation_rate; +use crate::{BenchMetricsPostgresSaver, BenchRunnerBench1Impl, BenchRunnerConfirmationRateImpl}; +use async_trait::async_trait; use bench::service_adapter1::BenchConfig; use log::warn; use postgres_types::ToSql; use std::time::SystemTime; -use async_trait::async_trait; -use crate::{BenchMetricsPostgresSaver, BenchRunner, BenchRunnerBench1Impl, BenchRunnerConfirmationRateImpl}; #[allow(clippy::upper_case_acronyms)] pub enum BenchRunStatus { @@ -60,11 +58,12 @@ pub async fn upsert_benchrun_status( Ok(()) } - - #[async_trait] impl BenchMetricsPostgresSaver for BenchRunnerBench1Impl { - async fn try_save_results_postgres(&self, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> { + async fn try_save_results_postgres( + &self, + postgres_session: &PostgresSessionCache, + ) -> anyhow::Result<()> { let metric = self.metric.get().expect("metric not set"); let metricjson = serde_json::to_value(metric).unwrap(); let values: &[&(dyn ToSql + Sync)] = &[ @@ -98,15 +97,16 @@ impl BenchMetricsPostgresSaver for BenchRunnerBench1Impl { ) .await?; - Ok(()) } } - #[async_trait] impl BenchMetricsPostgresSaver for BenchRunnerConfirmationRateImpl { - async fn try_save_results_postgres(&self, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> { + async fn try_save_results_postgres( + &self, + postgres_session: &PostgresSessionCache, + ) -> anyhow::Result<()> { let metric = self.metric.get().expect("metric not set"); let metricjson = serde_json::to_value(metric).unwrap(); let values: &[&(dyn ToSql + Sync)] = &[ @@ -116,8 +116,8 @@ impl BenchMetricsPostgresSaver for BenchRunnerConfirmationRateImpl { &(metric.txs_sent as i64), &(metric.txs_confirmed as i64), &(metric.txs_un_confirmed as i64), - &(metric.average_confirmation_time as f32), - &(metric.average_slot_confirmation_time as f32), + &(metric.average_confirmation_time), + &(metric.average_slot_confirmation_time), &metricjson, ]; postgres_session @@ -143,8 +143,6 @@ impl BenchMetricsPostgresSaver for BenchRunnerConfirmationRateImpl { ) .await?; - Ok(()) } } - diff --git a/benchrunner-service/src/prometheus/metrics_prometheus.rs b/benchrunner-service/src/prometheus/metrics_prometheus.rs index 5c3fb21a..466ada70 100644 --- a/benchrunner-service/src/prometheus/metrics_prometheus.rs +++ b/benchrunner-service/src/prometheus/metrics_prometheus.rs @@ -15,7 +15,7 @@ lazy_static::lazy_static! { } // TODO implement -pub async fn publish_metrics_on_prometheus( +pub async fn _publish_metrics_on_prometheus( tenant_config: &TenantConfig, _bench_config: &BenchConfig, metric: &Metric, From 9f73f8e4a014f0937e6286565630373dab09ed9a Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 27 Mar 2024 19:05:47 +0100 Subject: [PATCH 8/8] add sql queries --- benchrunner-service/QUERIES.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 benchrunner-service/QUERIES.md diff --git a/benchrunner-service/QUERIES.md b/benchrunner-service/QUERIES.md new file mode 100644 index 00000000..851eed78 --- /dev/null +++ b/benchrunner-service/QUERIES.md @@ -0,0 +1,19 @@ + +### Inspect the BenchRunner executions +```postgresql +SELECT * FROM benchrunner.bench_runs +ORDER BY ts DESC +``` + +### Bench1 (old bench) +```postgresql +SELECT * FROM benchrunner.bench_metrics_bench1 +ORDER BY ts DESC +``` + +### Bench Confirmation Rate +```postgresql +SELECT * FROM benchrunner.bench_metrics_confirmation_rate +ORDER BY ts DESC +``` +