From 017d90c7144ca5d653ac3362ccbc30c07bcab33e Mon Sep 17 00:00:00 2001 From: Sehz Date: Tue, 14 Jan 2025 15:31:39 -0800 Subject: [PATCH] reduce fbm to only deal with producer benchmark (#4315) provide defaults for high performance --- crates/fluvio-benchmark/Makefile | 2 + .../fluvio-benchmark/src/benchmark_driver.rs | 240 +++------ crates/fluvio-benchmark/src/bin/fbm.rs | 123 ++--- .../fluvio-benchmark/src/consumer_worker.rs | 4 + crates/fluvio-benchmark/src/lib.rs | 2 - .../fluvio-benchmark/src/producer_worker.rs | 69 +-- crates/fluvio-benchmark/src/stats.rs | 499 ------------------ .../fluvio-benchmark/src/stats_collector.rs | 287 +--------- 8 files changed, 141 insertions(+), 1085 deletions(-) create mode 100644 crates/fluvio-benchmark/Makefile delete mode 100644 crates/fluvio-benchmark/src/stats.rs diff --git a/crates/fluvio-benchmark/Makefile b/crates/fluvio-benchmark/Makefile new file mode 100644 index 0000000000..217e7f0e91 --- /dev/null +++ b/crates/fluvio-benchmark/Makefile @@ -0,0 +1,2 @@ +build: + cargo build --bin fbm --release \ No newline at end of file diff --git a/crates/fluvio-benchmark/src/benchmark_driver.rs b/crates/fluvio-benchmark/src/benchmark_driver.rs index 2f6214acd5..5972bd3d5e 100644 --- a/crates/fluvio-benchmark/src/benchmark_driver.rs +++ b/crates/fluvio-benchmark/src/benchmark_driver.rs @@ -1,237 +1,118 @@ -use std::time::Instant; +use std::sync::{atomic::Ordering, Arc}; use anyhow::Result; -use async_channel::{unbounded, Sender, Receiver}; -use tracing::{debug, info}; +use async_channel::{unbounded, Receiver}; +use tracing::debug; use fluvio_future::{task::spawn, future::timeout, timer::sleep}; use fluvio::{metadata::topic::TopicSpec, FluvioAdmin}; use crate::{ benchmark_config::BenchmarkConfig, producer_worker::ProducerWorker, - consumer_worker::ConsumerWorker, stats_collector::StatsWorker, stats::AllStatsSync, + stats_collector::ProduceStat, }; pub struct BenchmarkDriver {} impl BenchmarkDriver { - pub async fn run_samples(config: BenchmarkConfig, all_stats: AllStatsSync) -> Result<()> { + pub async fn run_samples(config: BenchmarkConfig) -> Result<()> { // Works send results to stats collector - let (tx_stats, rx_stats) = unbounded(); + //let (tx_stats, rx_stats) = unbounded(); - // Producers alert driver on success. - let (tx_success, mut rx_success) = unbounded(); let mut tx_controls = Vec::new(); let mut workers_jh = Vec::new(); + let stat = Arc::new(ProduceStat::default()); + // Set up producers for producer_id in 0..config.num_concurrent_producer_workers { + println!("starting up producer {}", producer_id); let (tx_control, rx_control) = unbounded(); - let worker = ProducerWorker::new(producer_id, config.clone(), tx_stats.clone()).await?; + let worker = ProducerWorker::new(producer_id, config.clone(), stat.clone()).await?; let jh = spawn(timeout( config.worker_timeout, - ProducerDriver::main_loop(rx_control, tx_success.clone(), worker), + ProducerDriver::main_loop(rx_control, worker), )); + + tx_control.send(ControlMessage::SendBatch).await?; tx_controls.push(tx_control); workers_jh.push(jh); } - debug!("Producer threads spawned successfully"); - - // Set up consumers - // Drivers tell consumers when they can stop trying to consume - let mut tx_stop = Vec::new(); - for partition in 0..config.num_partitions { - for consumer_number in 0..config.num_concurrent_consumers_per_partition { - let (tx_control, rx_control) = unbounded(); - let (tx, rx_stop) = unbounded(); - tx_stop.push(tx); - let consumer_id = partition * 10000000 + consumer_number; - let allocation_hint = config.num_records_per_producer_worker_per_batch - * config.num_concurrent_producer_workers - / config.num_partitions; - let worker = ConsumerWorker::new( - config.clone(), - consumer_id, - tx_stats.clone(), - rx_stop.clone(), - partition, - allocation_hint, - ) - .await?; - let jh = spawn(timeout( - config.worker_timeout, - ConsumerDriver::main_loop(rx_control, tx_success.clone(), worker), - )); - tx_controls.push(tx_control); - - workers_jh.push(jh); - } - } - debug!("Consumer threads spawned successfully"); - let (tx_control, rx_control) = unbounded(); - let worker = StatsWorker::new(tx_stop, rx_stats, config.clone(), all_stats); - let jh = spawn(timeout( - config.worker_timeout, - StatsDriver::main_loop(rx_control, tx_success, worker), - )); - workers_jh.push(jh); - tx_controls.push(tx_control); - debug!("Stats collector thread spawned successfully"); - - let num_expected_messages = workers_jh.len(); - - for i in 0..config.num_samples + 1 { - let now = Instant::now(); - // Prepare for batch - debug!("Preparing for batch"); - send_control_message(&mut tx_controls, ControlMessage::PrepareForBatch).await?; - expect_success(&mut rx_success, &config, num_expected_messages).await?; - - // Do the batch - debug!("Sending batch"); - send_control_message(&mut tx_controls, ControlMessage::SendBatch).await?; - expect_success(&mut rx_success, &config, num_expected_messages).await?; - - // Clean up the batch - debug!("Cleaning up batch"); - send_control_message( - &mut tx_controls, - ControlMessage::CleanupBatch { - produce_stats: i != 0, - }, - ) - .await?; - expect_success(&mut rx_success, &config, num_expected_messages).await?; + println!("benchmark started"); - // Wait between batches - debug!( - "Waiting {:?} between samples", - config.duration_between_samples - ); + // sleep every second + let max_run = 20; - let elapsed = now.elapsed(); - sleep(config.duration_between_samples).await; + // delay 1 seconds, so produce can start + sleep(std::time::Duration::from_secs(1)).await; - if i != 0 { - info!( - "Sample {} / {} complete, took {:?} + {:?}", - i, config.num_samples, elapsed, config.duration_between_samples - ); - } - } - // Close all worker tasks. - send_control_message(&mut tx_controls, ControlMessage::Exit).await?; - for jh in workers_jh { - timeout(config.worker_timeout, jh).await???; + for _run in 0..max_run { + // get current stats + + let run_start_num_messages = stat.message_send.load(Ordering::Relaxed); + let run_start_bytes = stat.message_bytes.load(Ordering::Relaxed); + + let start_time = std::time::Instant::now(); + sleep(std::time::Duration::from_secs(2)).await; + let elapse = start_time.elapsed().as_millis(); + + let run_end_bytes = stat.message_bytes.load(Ordering::Relaxed); + let run_end_num_messages = stat.message_send.load(Ordering::Relaxed); + let bytes_send = run_end_bytes - run_start_bytes; + let message_send = run_end_num_messages - run_start_num_messages; + + println!("total bytes send: {}", bytes_send); + println!("total message send: {}", message_send); + let bytes_per_sec = (bytes_send as f64 / elapse as f64) * 1000.0; + let human_readable_bytes = format!("{:9.1}mb/s", bytes_per_sec / 1000000.0); + let message_per_sec = ((message_send as f64 / elapse as f64) * 1000.0).round(); + println!( + "message: per second: {}, bytes per sec: {}, ", + message_per_sec, human_readable_bytes + ); } + stat.end.store(true, Ordering::Relaxed); + Ok(()) } - pub async fn run_benchmark(config: BenchmarkConfig, all_stats: AllStatsSync) -> Result<()> { + pub async fn run_benchmark(config: BenchmarkConfig) -> Result<()> { // Create topic for this run let new_topic = TopicSpec::new_computed(config.num_partitions as u32, 1, None); - debug!("Create topic spec"); let admin = FluvioAdmin::connect().await?; debug!("Connected to admin"); admin .create(config.topic_name.clone(), false, new_topic) .await?; - debug!("Topic created successfully {}", config.topic_name); - let result = BenchmarkDriver::run_samples(config.clone(), all_stats.clone()).await; + println!("created topic {}", config.topic_name); + let result = BenchmarkDriver::run_samples(config.clone()).await; + + println!("Benchmark completed"); + sleep(std::time::Duration::from_millis(100)).await; + + if let Err(result_err) = result { + println!("Error running samples: {:#?}", result_err); + } // Clean up topic admin.delete::(config.topic_name.clone()).await?; - debug!("Topic deleted successfully {}", config.topic_name); + print!("Topic deleted successfully {}", config.topic_name); - result?; Ok(()) } } -async fn send_control_message( - tx_control: &mut [Sender], - message: ControlMessage, -) -> Result<()> { - for tx_control in tx_control.iter_mut() { - tx_control.send(message).await?; - } - Ok(()) -} - -async fn expect_success( - rx_success: &mut Receiver>, - config: &BenchmarkConfig, - num_expected_messages: usize, -) -> Result<()> { - for _ in 0..num_expected_messages { - timeout(config.worker_timeout, rx_success.recv()).await???; - } - Ok(()) -} - struct ProducerDriver; impl ProducerDriver { - async fn main_loop( - rx: Receiver, - tx: Sender>, - mut worker: ProducerWorker, - ) -> Result<()> { - loop { - match rx.recv().await? { - ControlMessage::PrepareForBatch => { - worker.prepare_for_batch().await; - tx.send(Ok(())).await?; - } - ControlMessage::SendBatch => tx.send(worker.send_batch().await).await?, - ControlMessage::CleanupBatch { .. } => tx.send(Ok(())).await?, - ControlMessage::Exit => return Ok(()), - }; - } - } -} -struct ConsumerDriver; - -impl ConsumerDriver { - async fn main_loop( - rx: Receiver, - tx: Sender>, - mut worker: ConsumerWorker, - ) -> Result<()> { + async fn main_loop(rx: Receiver, mut worker: ProducerWorker) -> Result<()> { loop { match rx.recv().await? { - ControlMessage::PrepareForBatch => tx.send(Ok(())).await?, - ControlMessage::SendBatch => tx.send(worker.consume().await).await?, - ControlMessage::CleanupBatch { .. } => tx.send(worker.send_results().await).await?, - ControlMessage::Exit => return Ok(()), - }; - } - } -} - -struct StatsDriver; -impl StatsDriver { - async fn main_loop( - rx: Receiver, - tx: Sender>, - mut worker: StatsWorker, - ) -> Result<()> { - loop { - match rx.recv().await? { - ControlMessage::PrepareForBatch => { - tx.send(Ok(())).await?; - } ControlMessage::SendBatch => { - tx.send(worker.collect_send_recv_messages().await).await? - } - ControlMessage::CleanupBatch { produce_stats } => { - let results = worker.validate().await; - if produce_stats { - worker.compute_stats().await; + println!("producer send batch"); + if let Err(err) = worker.send_batch().await { + println!("producer send batch error: {:#?}", err); } - worker.new_batch(); - tx.send(results).await?; } - ControlMessage::Exit => return Ok(()), }; } } @@ -239,8 +120,5 @@ impl StatsDriver { #[derive(Clone, Copy, Debug)] enum ControlMessage { - PrepareForBatch, SendBatch, - CleanupBatch { produce_stats: bool }, - Exit, } diff --git a/crates/fluvio-benchmark/src/bin/fbm.rs b/crates/fluvio-benchmark/src/bin/fbm.rs index 1c60fe5515..f285e76275 100644 --- a/crates/fluvio-benchmark/src/bin/fbm.rs +++ b/crates/fluvio-benchmark/src/bin/fbm.rs @@ -1,18 +1,6 @@ -use std::{ - fs::File, - io::{Read, Write}, - path::PathBuf, - sync::Arc, - mem, - time::Duration, -}; - use clap::{arg, Parser}; use anyhow::Result; -use fluvio_cli_common::install::fluvio_base_dir; -use fluvio_future::{task::run_block_on, sync::Mutex, future::timeout}; -use futures_util::FutureExt; use fluvio::{Compression, Isolation}; use fluvio_benchmark::{ benchmark_config::{ @@ -24,9 +12,8 @@ use fluvio_benchmark::{ Seconds, Millis, }, benchmark_driver::BenchmarkDriver, - stats::{AllStats, AllStatsSync}, - BenchmarkError, }; +use fluvio_future::task::run_block_on; fn main() -> Result<()> { fluvio_future::subscriber::init_logger(); @@ -38,7 +25,7 @@ fn main() -> Result<()> { } // TODO accept directory of files. - let matrices = if args.test_cluster { + let mut matrices = if args.test_cluster { test_configs() } else { match args.config { @@ -47,81 +34,33 @@ fn main() -> Result<()> { } }; - let all_stats = Arc::new(Mutex::new(AllStats::default())); - let previous = load_previous_stats(); + for matrix in &mut matrices { + if let Some(producer_batch_size) = args.producer_batch_size { + matrix.producer_config.batch_size = vec![producer_batch_size]; + } + if let Some(record_size) = args.record_size { + matrix.load_config.record_size = vec![record_size]; + } + } + + run_block_on(run_benchmark(matrices)) +} + +async fn run_benchmark(matrices: Vec) -> Result<()> { + // let previous = load_previous_stats(); - println!("# Fluvio Benchmark Results"); + //println!("# Fluvio Benchmark Results"); for matrix in matrices { - println!("## Matrix: {}", matrix.shared_config.matrix_name); + // println!(" Matrix: {}", matrix.shared_config.matrix_name); + println!("{:#?}", matrix); for (i, config) in matrix.into_iter().enumerate() { - run_block_on(timeout( - // Give time for workers to clean up if workers timeout. - config.worker_timeout + Duration::from_secs(10), - BenchmarkDriver::run_benchmark(config.clone(), all_stats.clone()), - ))??; + BenchmarkDriver::run_benchmark(config.clone()).await?; println!("### {}: Iteration {:3.0}", config.matrix_name, i); println!("{}", config.to_markdown()); println!(); - run_block_on( - all_stats - .lock() - .map(|a| println!("{}", a.to_markdown(&config))), - ); - if let Some(other) = previous.as_ref() { - run_block_on( - all_stats - .lock() - .map(|a| println!("{}", a.compare_stats(&config, other))), - ); - println!(); - } } } - let mut all_stats = run_block_on(take_stats(all_stats)); - - if let Some(previous) = previous { - all_stats.merge(&previous) - } - write_stats(all_stats) -} - -async fn take_stats(all_stats: AllStatsSync) -> AllStats { - let mut guard = all_stats.lock().await; - mem::take(&mut *guard) -} -fn benchmarking_dir() -> Result { - let dir_path = fluvio_base_dir()?.join("benchmarks"); - if !dir_path.exists() { - std::fs::create_dir_all(&dir_path)?; - } - Ok(dir_path) -} - -fn historic_run_path() -> Result { - let mut path = benchmarking_dir()?; - path.push("previous"); - Ok(path) -} - -fn load_previous_stats() -> Option { - let mut file = File::open(historic_run_path().ok()?).ok()?; - let mut buffer: Vec = Vec::new(); - file.read_to_end(&mut buffer).ok()?; - AllStats::decode(&buffer).ok() -} - -fn write_stats(stats: AllStats) -> Result<()> { - let encoded = stats.encode(); - - let mut file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(historic_run_path()?) - .map_err(|e| BenchmarkError::ErrorWithExplanation(format!("{e:?}")))?; - file.write(&encoded) - .map_err(|e| BenchmarkError::ErrorWithExplanation(format!("{e:?}")))?; Ok(()) } @@ -135,9 +74,9 @@ fn print_example_config() { worker_timeout_seconds: Seconds::new(3600), }, producer_config: FluvioProducerConfig { - batch_size: vec![16000], + batch_size: vec![1048576], queue_size: vec![100], - linger_millis: vec![Millis::new(10)], + linger_millis: vec![Millis::new(100)], server_timeout_millis: vec![Millis::new(5000)], compression: vec![ Compression::None, @@ -166,7 +105,7 @@ fn print_example_config() { ], num_concurrent_producer_workers: vec![1], num_concurrent_consumers_per_partition: vec![1], - record_size: vec![1000], + record_size: vec![5000], }, }; println!("{}", serde_yaml::to_string(&example_config).unwrap()); @@ -226,9 +165,9 @@ fn default_configs() -> Vec { worker_timeout_seconds: Seconds::new(3000), }, producer_config: FluvioProducerConfig { - batch_size: vec![16000], - queue_size: vec![100], - linger_millis: vec![Millis::new(10)], + batch_size: vec![1000000], + queue_size: vec![10], + linger_millis: vec![Millis::new(0)], server_timeout_millis: vec![Millis::new(5000)], compression: vec![Compression::None], isolation: vec![Isolation::ReadUncommitted], @@ -244,11 +183,11 @@ fn default_configs() -> Vec { num_partitions: vec![1], }, load_config: BenchmarkLoadConfig { - num_records_per_producer_worker_per_batch: vec![100, 1000, 10000], + num_records_per_producer_worker_per_batch: vec![100], record_key_allocation_strategy: vec![RecordKeyAllocationStrategy::NoKey], num_concurrent_producer_workers: vec![1], num_concurrent_consumers_per_partition: vec![1], - record_size: vec![1000], + record_size: vec![5000], }, }] } @@ -268,4 +207,10 @@ struct Args { /// Run a suite of tests to ensure fluvio is behaving as expected #[arg(short, long, exclusive(true))] test_cluster: bool, + + #[arg(long)] + producer_batch_size: Option, + + #[arg(long)] + record_size: Option, } diff --git a/crates/fluvio-benchmark/src/consumer_worker.rs b/crates/fluvio-benchmark/src/consumer_worker.rs index b3a9ab44eb..7783fe4cb7 100644 --- a/crates/fluvio-benchmark/src/consumer_worker.rs +++ b/crates/fluvio-benchmark/src/consumer_worker.rs @@ -52,6 +52,7 @@ impl ConsumerWorker { }) } + /* pub async fn consume(&mut self) -> Result<()> { self.received.clear(); loop { @@ -78,7 +79,9 @@ impl ConsumerWorker { } } } + */ + /* pub async fn send_results(&mut self) -> Result<()> { for (record, recv_time) in self.received.iter() { let data = record.get_value().as_utf8_lossy_string(); @@ -92,4 +95,5 @@ impl ConsumerWorker { } Ok(()) } + */ } diff --git a/crates/fluvio-benchmark/src/lib.rs b/crates/fluvio-benchmark/src/lib.rs index 2076236e67..4286e80103 100644 --- a/crates/fluvio-benchmark/src/lib.rs +++ b/crates/fluvio-benchmark/src/lib.rs @@ -8,12 +8,10 @@ use rand::{distributions::Alphanumeric, Rng}; use fluvio_future::future::TimeoutError; use fluvio::{RecordKey, FluvioError}; -pub mod consumer_worker; pub mod benchmark_config; pub mod producer_worker; pub mod stats_collector; pub mod benchmark_driver; -pub mod stats; pub struct BenchmarkRecord { pub key: RecordKey, diff --git a/crates/fluvio-benchmark/src/producer_worker.rs b/crates/fluvio-benchmark/src/producer_worker.rs index 98535cea19..19d0e13378 100644 --- a/crates/fluvio-benchmark/src/producer_worker.rs +++ b/crates/fluvio-benchmark/src/producer_worker.rs @@ -1,17 +1,20 @@ -use std::time::Instant; +use std::sync::{atomic::Ordering, Arc}; -use async_channel::Sender; use anyhow::Result; -use fluvio::{TopicProducerPool, Fluvio, RecordKey, TopicProducerConfigBuilder}; +use fluvio::{ + dataplane::{bytes::Bytes, record::RecordData}, + Fluvio, RecordKey, TopicProducerConfigBuilder, TopicProducerPool, +}; use crate::{ benchmark_config::{ - BenchmarkConfig, benchmark_matrix::{RecordKeyAllocationStrategy, SHARED_KEY}, + BenchmarkConfig, }, - BenchmarkRecord, generate_random_string, BenchmarkError, - stats_collector::StatsCollectorMessage, + generate_random_string, + stats_collector::ProduceStat, + BenchmarkRecord, }; pub struct ProducerWorker { @@ -19,29 +22,27 @@ pub struct ProducerWorker { records_to_send: Option>, config: BenchmarkConfig, producer_id: u64, - tx_to_stats_collector: Sender, + stat: Arc, } impl ProducerWorker { pub async fn new( producer_id: u64, config: BenchmarkConfig, - tx_to_stats_collector: Sender, + stat: Arc, ) -> Result { let fluvio = Fluvio::connect().await?; let fluvio_config = TopicProducerConfigBuilder::default() .batch_size(config.producer_batch_size as usize) .batch_queue_size(config.producer_queue_size as usize) + .max_request_size(3200000) .linger(config.producer_linger) // todo allow alternate partitioner .compression(config.producer_compression) .timeout(config.producer_server_timeout) .isolation(config.producer_isolation) .delivery_semantic(config.producer_delivery_semantic) - .build() - .map_err(|e| { - BenchmarkError::ErrorWithExplanation(format!("Fluvio topic config error: {e:?}")) - })?; + .build()?; let fluvio_producer = fluvio .topic_producer_with_config(config.topic_name.clone(), fluvio_config) .await?; @@ -50,7 +51,7 @@ impl ProducerWorker { records_to_send: None, config, producer_id, - tx_to_stats_collector, + stat, }) } pub async fn prepare_for_batch(&mut self) { @@ -77,27 +78,31 @@ impl ProducerWorker { } pub async fn send_batch(&mut self) -> Result<()> { - for record in self.records_to_send.take().ok_or_else(|| { - BenchmarkError::ErrorWithExplanation( - "prepare_for_batch() not called on PrdoucerWorker".to_string(), - ) - })? { - self.tx_to_stats_collector - .send(StatsCollectorMessage::MessageSent { - hash: record.hash, - send_time: Instant::now(), - num_bytes: record.data.len() as u64, - }) - .await?; + println!("producer is sending batch"); + let data_len = self.config.record_size as usize; + let data = Bytes::from(generate_random_string(data_len)); + // let record = BenchmarkRecord::new(RecordKey::NULL, rando_datam); + let record_data: RecordData = data.into(); + + loop { + if self.stat.end.load(Ordering::Relaxed) { + self.fluvio_producer.flush().await?; + break; + } + + //println!("Sending record: {}", records.len()); + self.stat + .message_bytes + .fetch_add(data_len as u64, Ordering::Relaxed); + self.stat.message_send.fetch_add(1, Ordering::Relaxed); - self.fluvio_producer.send(record.key, record.data).await?; + self.fluvio_producer + .send(RecordKey::NULL, record_data.clone()) + .await?; } - self.fluvio_producer.flush().await?; - self.tx_to_stats_collector - .send(StatsCollectorMessage::ProducerFlushed { - flush_time: Instant::now(), - }) - .await?; + + println!("producer is done sending batch"); + Ok(()) } } diff --git a/crates/fluvio-benchmark/src/stats.rs b/crates/fluvio-benchmark/src/stats.rs deleted file mode 100644 index 4c86f060c7..0000000000 --- a/crates/fluvio-benchmark/src/stats.rs +++ /dev/null @@ -1,499 +0,0 @@ -use std::{ - time::{Instant, Duration}, - collections::{HashMap, BTreeMap}, - fmt::{Formatter, Display}, - sync::Arc, -}; -use fluvio_future::sync::Mutex; -use hdrhistogram::Histogram; -use madato::yaml::mk_md_table_from_yaml; -use tracing::{trace, debug}; -use serde::{Serialize, Deserialize}; -use statrs::distribution::{StudentsT, ContinuousCDF}; -use statrs::statistics::Statistics; -use crate::{stats_collector::BatchStats, benchmark_config::BenchmarkConfig, BenchmarkError}; - -pub const P_VALUE: f64 = 0.001; -// Used to compare if two p_values are equal in TTestResult -const P_VALUE_EPSILON: f64 = 0.00005; - -const HIST_PRECISION: u8 = 3; - -pub type AllStatsSync = Arc>; - -#[derive(Default, Serialize, Deserialize)] -pub struct AllStats(HashMap); - -impl AllStats { - pub fn iter(&self) -> std::collections::hash_map::Iter<'_, BenchmarkConfig, BenchmarkStats> { - self.0.iter() - } - pub fn encode(&self) -> Vec { - bincode::serialize(self).unwrap() - } - - pub fn decode(bytes: &[u8]) -> Result { - let decoded: Self = bincode::deserialize(bytes).map_err(|_| { - BenchmarkError::ErrorWithExplanation("Failed to deserialized".to_string()) - })?; - Ok(decoded) - } - pub fn compare_stats(&self, config: &BenchmarkConfig, other: &AllStats) -> String { - let stats = self.0.get(config).unwrap(); - - if let Some(other_stats) = other.0.get(config) { - stats.compare(other_stats, config) - } else { - "**No previous results for config found.**".to_string() - } - } - - /// Merges the maps of config -> stats, giving priority to self in the case of duplicate - /// configs - pub fn merge(&mut self, other: &AllStats) { - for (config, results) in other.iter() { - if !self.0.contains_key(config) { - self.0.insert(config.clone(), results.clone()); - } - } - } - - pub fn to_markdown(&self, config: &BenchmarkConfig) -> String { - let mut md = String::new(); - if let Some(stats) = self.0.get(config) { - let values = stats.data.get(&Variable::Latency).unwrap(); - let mut hist: Histogram = Histogram::new(HIST_PRECISION).unwrap(); - for v in values.iter() { - hist += *v; - } - let mut latency_yaml = "- Variable: Latency\n".to_string(); - for percentile in [0.0, 0.5, 0.95, 0.99, 1.0] { - latency_yaml.push_str(&format!( - " p{percentile:4.2}: {}\n", - Variable::Latency.format(hist.value_at_quantile(percentile)) - )); - } - md.push_str("**Per Record E2E Latency**\n\n"); - md.push_str(&mk_md_table_from_yaml(&latency_yaml, &None)); - let mut throughput_yaml = String::new(); - for (variable, description) in [ - (Variable::ProducerThroughput, "First Produced Message <-> Producer Flush Complete"), - (Variable::ConsumerThroughput, "First Consumed Message (First Time Consumed) <-> Last Consumed Message (First Time Consumed)"), - (Variable::CombinedThroughput, "First Produced Message <-> Last Consumed Message (First Time Consumed)"), - ] { - throughput_yaml.push_str(&format!("- Variable: {variable}\n")); - let values = stats.data.get(&variable).unwrap(); - let mut hist: Histogram = Histogram::new(HIST_PRECISION).unwrap(); - for v in values.iter() { - hist += *v; - } - - for (label, percentile) in [("Min", 0.0), ("Median", 0.5), ("Max", 1.0)] { - throughput_yaml.push_str(&format!( - " {}: {}\n", - label, - variable.format(hist.value_at_quantile(percentile)) - )); - } - - throughput_yaml.push_str(&format!(" Description: \"{description}\"\n")); - } - md.push_str("\n\n**Throughput (Total Produced Bytes / Time)**\n\n"); - md.push_str(&mk_md_table_from_yaml(&throughput_yaml, &None)); - md.push('\n'); - } else { - md.push_str("Stats unavailable\n"); - } - md - } - - pub fn compute_stats(&mut self, config: &BenchmarkConfig, data: &BatchStats) { - let mut first_produce_time: Option = None; - let last_produce_time = data.last_flush_time.unwrap(); - let mut first_consume_time: Option = None; - let mut last_consume_time: Option = None; // TODO this is just the first time a single message was received... what should behavior be when multiple consumers - let mut num_bytes = 0; - let mut latency = Vec::new(); - for record in data.iter() { - latency.push(record.first_recv_latency().as_micros() as u64); - let produced_time = record.send_time.unwrap(); - let consumed_time = record.first_received_time.unwrap(); - if let Some(p) = first_produce_time { - if produced_time < p { - first_produce_time = Some(produced_time); - } - } else { - first_produce_time = Some(produced_time); - }; - if let Some(c) = first_consume_time { - if consumed_time < c { - first_consume_time = Some(consumed_time); - } - } else { - first_consume_time = Some(consumed_time); - }; - if let Some(c) = last_consume_time { - if consumed_time > c { - last_consume_time = Some(consumed_time); - } - } else { - last_consume_time = Some(consumed_time); - }; - num_bytes += record.num_bytes.unwrap(); - } - let produce_time = last_produce_time - first_produce_time.unwrap(); - let consume_time = last_consume_time.unwrap() - first_consume_time.unwrap(); - let combined_time = last_consume_time.unwrap() - first_produce_time.unwrap(); - - self.record_data(config, Variable::Latency, latency); - self.record_data( - config, - Variable::ProducerThroughput, - vec![(num_bytes as f64 / produce_time.as_secs_f64()) as u64], - ); - self.record_data( - config, - Variable::ConsumerThroughput, - vec![(num_bytes as f64 / consume_time.as_secs_f64()) as u64], - ); - self.record_data( - config, - Variable::CombinedThroughput, - vec![(num_bytes as f64 / combined_time.as_secs_f64()) as u64], - ); - } - - fn record_data(&mut self, config: &BenchmarkConfig, variable: Variable, mut values: Vec) { - let benchmark_stats = self - .0 - .entry(config.clone()) - .or_insert_with(|| BenchmarkStats::new(config)); - let entry = benchmark_stats.data.entry(variable).or_default(); - entry.append(&mut values); - } -} - -#[derive(Serialize, Deserialize, Clone)] -pub struct BenchmarkStats { - data: BTreeMap>, - config: BenchmarkConfig, -} - -impl BenchmarkStats { - pub fn compare(&self, other: &BenchmarkStats, config: &BenchmarkConfig) -> String { - let mut md = String::new(); - let mut yaml = String::new(); - for (variable, samples) in self.data.iter() { - yaml.push_str(&format!("- Variable: {variable}\n")); - if let Some(other_samples) = other.data.get(variable) { - let (samples, other_samples) = if samples.len() == config.num_samples { - let samples: Vec = samples.iter().map(|x| *x as f64).collect(); - let other_samples: Vec = other_samples.iter().map(|x| *x as f64).collect(); - (samples, other_samples) - } else { - let items_per_sample = samples.len() / config.num_samples; - let samples: Vec = (0..config.num_samples) - .map(|i| { - *samples[i * items_per_sample..(i + 1) * items_per_sample] - .iter() - .max() - .unwrap() as f64 - }) - .collect(); - let other_samples: Vec = (0..config.num_samples) - .map(|i| { - *other_samples[i * items_per_sample..(i + 1) * items_per_sample] - .iter() - .max() - .unwrap() as f64 - }) - .collect(); - (samples, other_samples) - }; - match variable.compare(&samples, &other_samples) { - CompareResult::Better { - previous, - next, - p_value, - } => { - yaml.push_str(" Change: Better\n"); - yaml.push_str(&format!( - " Previous: {}\n", - variable.format(previous as u64) - )); - yaml.push_str(&format!(" Current: {}\n", variable.format(next as u64))); - yaml.push_str(&format!(" P-Value: {p_value:7.5}\n")); - } - CompareResult::Worse { - previous, - next, - p_value, - } => { - yaml.push_str(" Change: Worse\n"); - yaml.push_str(&format!( - " Previous: {}\n", - variable.format(previous as u64) - )); - yaml.push_str(&format!(" Current: {}\n", variable.format(next as u64))); - yaml.push_str(&format!(" P-Value: {p_value:7.5}\n")); - } - CompareResult::NoChange => { - yaml.push_str(" Change: None\n"); - } - CompareResult::Uncomparable => { - yaml.push_str(" Change: Uncomparable\n"); - } - } - } else { - debug!("Key not found: {variable}"); - } - } - if !self.data.is_empty() { - md.push_str(&format!( - "**Comparision with previous results: {} @ {}**\n\n", - other.config.current_profile, other.config.timestamp - )); - md.push_str(&mk_md_table_from_yaml(&yaml, &None)); - } else { - md.push_str("**No comparison variables found**"); - } - md - } - - pub fn new(config: &BenchmarkConfig) -> Self { - Self { - data: Default::default(), - config: config.clone(), - } - } -} - -#[derive(Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Ord, PartialOrd)] -pub enum Variable { - Latency, - ProducerThroughput, - ConsumerThroughput, - CombinedThroughput, -} -impl Display for Variable { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Variable::Latency => write!(f, "Latency"), - Variable::ProducerThroughput => write!(f, "Producer Throughput"), - Variable::ConsumerThroughput => write!(f, "Consumer Throughput"), - Variable::CombinedThroughput => write!(f, "Combined Throughput"), - } - } -} - -impl Variable { - pub fn compare(&self, a: &[f64], b: &[f64]) -> CompareResult { - if a.len() != b.len() { - return CompareResult::Uncomparable; - } - let a_mean = a.mean(); - let b_mean = b.mean(); - - match two_sample_t_test(a_mean, b_mean, a.std_dev(), b.std_dev(), a.len(), P_VALUE) { - TTestResult::X1GreaterThanX2(p) => self.greater(a_mean, b_mean, p), - TTestResult::FailedToRejectH0 => CompareResult::NoChange, - TTestResult::X1LessThanX2(p) => self.less(a_mean, b_mean, p), - } - } - - fn greater(&self, a_mean: f64, b_mean: f64, p_value: f64) -> CompareResult { - match self { - Variable::Latency => CompareResult::Worse { - previous: b_mean, - next: a_mean, - p_value, - }, - Variable::ProducerThroughput => CompareResult::Better { - previous: b_mean, - next: a_mean, - p_value, - }, - Variable::ConsumerThroughput => CompareResult::Better { - previous: b_mean, - next: a_mean, - p_value, - }, - Variable::CombinedThroughput => CompareResult::Better { - previous: b_mean, - next: a_mean, - p_value, - }, - } - } - fn less(&self, a_mean: f64, b_mean: f64, p_value: f64) -> CompareResult { - match self { - Variable::Latency => CompareResult::Better { - previous: b_mean, - next: a_mean, - p_value, - }, - Variable::ProducerThroughput => CompareResult::Worse { - previous: b_mean, - next: a_mean, - p_value, - }, - Variable::ConsumerThroughput => CompareResult::Worse { - previous: b_mean, - next: a_mean, - p_value, - }, - Variable::CombinedThroughput => CompareResult::Worse { - previous: b_mean, - next: a_mean, - p_value, - }, - } - } - - fn format(&self, v: u64) -> String { - match self { - Variable::Latency => format!("{:>9?}", Duration::from_micros(v)), - Variable::ProducerThroughput => format!("{:9.3}mb/s", v as f64 / 1000000.0), - Variable::ConsumerThroughput => format!("{:9.3}mb/s", v as f64 / 1000000.0), - Variable::CombinedThroughput => format!("{:9.3}mb/s", v as f64 / 1000000.0), - } - } -} - -#[derive(Copy, Clone, Debug)] -pub enum CompareResult { - /// Variable performance is better at the p=0.001 level - Better { - previous: f64, - next: f64, - p_value: f64, - }, - /// Variable performance is worse at the p=0.001 level - Worse { - previous: f64, - next: f64, - p_value: f64, - }, - /// No changed detected at p = .001 level - NoChange, - /// The BenchmarkStats do not have the same variables so they cannot be compared - Uncomparable, -} - -pub fn two_sample_t_test( - x1: f64, - x2: f64, - std_dev_1: f64, - std_dev_2: f64, - num_samples: usize, - p_value: f64, -) -> TTestResult { - // Welchs-t-test - // https://en.wikipedia.org/wiki/Student%27s_t-test#Equal_sample_sizes_and_variance - // https://en.wikipedia.org/wiki/Welch%27s_t-test - // https://www.statology.org/welchs-t-test/ - - let v1 = std_dev_1 * std_dev_1; - let v2 = std_dev_2 * std_dev_2; - let n = num_samples as f64; - - let t = (x1 - x2) / (v1 / n + v2 / n).sqrt(); - let df_numerator = (v1 / n + v2 / n).powi(2); - let df_denominator = (v1 / n).powi(2) / (n - 1.0) + (v2 / n).powi(2) / (n - 1.0); - let degrees_of_freedom = df_numerator / df_denominator; - - let students_t_dist = - StudentsT::new(0.0, 1.0, degrees_of_freedom).expect("Failed to create students t dist"); - - // https://www.omnicalculator.com/statistics/p-value - // Left-tailed test: p-value = cdf(x) - // Right-tailed test: p-value = 1 - cdf(x) - // Two-tailed test: p-value = 2 * min{cdf(x) , 1 - cdf(x)} - let left_tailed_p_value = students_t_dist.cdf(t); - let right_tailed_p_value = 1.0 - students_t_dist.cdf(t); - - trace!( - "t: {:?} df: {:?}, lv {:?}, rv {:?}", - t, - degrees_of_freedom, - left_tailed_p_value, - right_tailed_p_value - ); - if left_tailed_p_value <= p_value { - TTestResult::X1LessThanX2(left_tailed_p_value) - } else if right_tailed_p_value <= p_value { - TTestResult::X1GreaterThanX2(right_tailed_p_value) - } else { - TTestResult::FailedToRejectH0 - } -} -fn almost_equal(a: f64, b: f64, epsilon: f64) -> bool { - let difference = a - b; - difference.abs() < epsilon -} - -#[derive(Debug, Copy, Clone)] -pub enum TTestResult { - /// Contains p_value of left-tailed students t test - X1GreaterThanX2(f64), - FailedToRejectH0, - /// Contains p_value of right-tailed students t test - X1LessThanX2(f64), -} - -impl Eq for TTestResult {} -impl PartialEq for TTestResult { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Self::X1GreaterThanX2(l0), Self::X1GreaterThanX2(r0)) => { - almost_equal(*l0, *r0, P_VALUE_EPSILON) - } - (Self::FailedToRejectH0, Self::FailedToRejectH0) => true, - (Self::X1LessThanX2(l0), Self::X1LessThanX2(r0)) => { - almost_equal(*l0, *r0, P_VALUE_EPSILON) - } - _ => false, - } - } -} - -impl Display for TTestResult { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let s = match self { - TTestResult::X1GreaterThanX2(p) => ('+', p), - TTestResult::FailedToRejectH0 => { - return write!(f, "?"); - } - TTestResult::X1LessThanX2(p) => ('-', p), - }; - write!(f, "{} (p={:5.3})", s.0, s.1) - } -} - -#[cfg(test)] -mod tests { - - use crate::stats::TTestResult; - - use super::two_sample_t_test; - - #[test] - fn test_two_sample_t_test() { - // Test cases done using comparisons with results from wolframalpha - // https://www.wolframalpha.com/input?i=t+test&assumption=%7B%22F%22%2C+%22TTest%22%2C+%22xbar%22%7D+-%3E%221%22&assumption=%7B%22F%22%2C+%22TTest%22%2C+%22mu0%22%7D+-%3E%220%22&assumption=%7B%22F%22%2C+%22TTest%22%2C+%22s%22%7D+-%3E%2210%22&assumption=%7B%22F%22%2C+%22TTest%22%2C+%22n%22%7D+-%3E%2250%22 - - assert_eq!( - two_sample_t_test(100.0, 105.0, 5.0, 7.0, 5, 0.05), - TTestResult::FailedToRejectH0 - ); - - assert_eq!( - two_sample_t_test(100.0, 105.0, 5.0, 7.0, 25, 0.05), - TTestResult::X1LessThanX2(0.002869) - ); - assert_eq!( - two_sample_t_test(100.0, 97.0, 5.0, 7.0, 50, 0.05), - TTestResult::X1GreaterThanX2(0.007794) - ); - } -} diff --git a/crates/fluvio-benchmark/src/stats_collector.rs b/crates/fluvio-benchmark/src/stats_collector.rs index fcc70c0f63..69d0546cfa 100644 --- a/crates/fluvio-benchmark/src/stats_collector.rs +++ b/crates/fluvio-benchmark/src/stats_collector.rs @@ -1,285 +1,8 @@ -use std::{ - time::{Instant, Duration}, - collections::HashMap, -}; - -use async_channel::{Receiver, Sender}; -use tracing::debug; -use anyhow::Result; - -use crate::{BenchmarkError, benchmark_config::BenchmarkConfig}; -use crate::stats::AllStatsSync; - -// We expect every message produced to be read number_of_consumers_per_partition times. -// We also expect a total of num_producers_per_batch * num_records_per_batch unique messages. -#[derive(Default)] -pub struct BatchStats { - collected_records: HashMap, - pub last_flush_time: Option, -} -impl BatchStats { - pub fn record_sent( - &mut self, - hash: u64, - send_time: Instant, - num_bytes: u64, - ) -> Result<(), BenchmarkError> { - let val = self.collected_records.entry(hash).or_default(); - val.mark_send_time(send_time, num_bytes) - } - - pub fn record_recv( - &mut self, - hash: u64, - recv_time: Instant, - consumer_id: u64, - ) -> Result<(), BenchmarkError> { - let val = self.collected_records.entry(hash).or_default(); - val.mark_recv_time(recv_time, consumer_id) - } - - pub fn flush_recv(&mut self, flush_time: Instant) { - if let Some(previous) = self.last_flush_time { - if flush_time > previous { - self.last_flush_time = Some(flush_time); - } - } else { - self.last_flush_time = Some(flush_time) - } - } - - pub fn iter(&self) -> std::collections::hash_map::Values { - self.collected_records.values() - } -} - -pub struct StatsWorker { - receiver: Receiver, - current_batch: BatchStats, - config: BenchmarkConfig, - tx_stop_consume: Vec>, - all_stats: AllStatsSync, -} - -impl StatsWorker { - pub fn new( - tx_stop_consume: Vec>, - receiver: Receiver, - config: BenchmarkConfig, - all_stats: AllStatsSync, - ) -> Self { - Self { - receiver, - current_batch: BatchStats::default(), - config, - tx_stop_consume, - all_stats, - } - } - - pub async fn collect_send_recv_messages(&mut self) -> Result<()> { - let num_produced = self.config.total_number_of_messages_produced_per_batch(); - let num_consumed = self.config.total_number_of_messages_produced_per_batch() - * self.config.number_of_expected_times_each_message_consumed(); - let num_flushed = self.config.num_concurrent_producer_workers; - let total_expected_messages = num_produced + num_consumed + num_flushed; - debug!( - "Stats listening for {num_produced} sent messages and {num_consumed} received messages" - ); - for _ in 0..total_expected_messages { - match self.receiver.recv().await { - Ok(message) => match message { - StatsCollectorMessage::MessageSent { - hash, - send_time, - num_bytes, - } => { - self.current_batch.record_sent(hash, send_time, num_bytes)?; - } - StatsCollectorMessage::MessageReceived => {} - StatsCollectorMessage::MessageHash { .. } => { - return Err(BenchmarkError::ErrorWithExplanation( - "Received unexpected message hash".to_string(), - ) - .into()); - } - StatsCollectorMessage::ProducerFlushed { flush_time } => { - self.current_batch.flush_recv(flush_time) - } - }, - Err(_) => { - return Err(BenchmarkError::ErrorWithExplanation( - "StatsCollectorChannelClosed".to_string(), - ) - .into()) - } - } - } - debug!("All expected messages sent and received"); - for tx in self.tx_stop_consume.iter() { - tx.send(()).await?; - } - Ok(()) - } - - pub async fn validate(&mut self) -> Result<()> { - let number_of_consumed_messages = self.config.total_number_of_messages_produced_per_batch() - * self.config.number_of_expected_times_each_message_consumed(); - for _ in 0..number_of_consumed_messages { - match self.receiver.recv().await { - Ok(message) => match message { - StatsCollectorMessage::MessageSent { .. } => { - return Err(BenchmarkError::ErrorWithExplanation( - "Received unexpected message sent".to_string(), - ) - .into()); - } - StatsCollectorMessage::MessageReceived => { - return Err(BenchmarkError::ErrorWithExplanation( - "Received unexpected message received".to_string(), - ) - .into()); - } - StatsCollectorMessage::MessageHash { - hash, - recv_time, - consumer_id, - } => { - self.current_batch - .record_recv(hash, recv_time, consumer_id)?; - } - StatsCollectorMessage::ProducerFlushed { .. } => { - return Err(BenchmarkError::ErrorWithExplanation( - "Received unexpected message flushed".to_string(), - ) - .into()); - } - }, - Err(_) => { - return Err(BenchmarkError::ErrorWithExplanation( - "StatsCollectorChannelClosed".to_string(), - ) - .into()) - } - } - } - - let expected_num_times_consumed = - self.config.number_of_expected_times_each_message_consumed(); - for value in self.current_batch.collected_records.values() { - value.validate(expected_num_times_consumed as usize)?; - } - debug!("Batch validated"); - - Ok(()) - } - - pub async fn compute_stats(&self) { - self.all_stats - .lock() - .await - .compute_stats(&self.config, &self.current_batch); - } - - pub fn new_batch(&mut self) { - self.current_batch = BatchStats::default(); - } -} - -pub enum StatsCollectorMessage { - MessageSent { - hash: u64, - send_time: Instant, - num_bytes: u64, - }, - ProducerFlushed { - flush_time: Instant, - }, - MessageReceived, - - MessageHash { - hash: u64, - recv_time: Instant, - consumer_id: u64, - }, -} +use std::sync::atomic::{AtomicBool, AtomicU64}; #[derive(Default)] -pub struct RecordMetadata { - pub num_bytes: Option, - pub send_time: Option, - pub first_received_time: Option, - pub last_received_time: Option, - pub receivers_list: Vec, -} -impl RecordMetadata { - pub fn mark_send_time( - &mut self, - send_time: Instant, - num_bytes: u64, - ) -> Result<(), BenchmarkError> { - if self.send_time.is_some() { - Err(BenchmarkError::ErrorWithExplanation( - "Message already marked as sent".to_string(), - )) - } else { - self.num_bytes = Some(num_bytes); - self.send_time = Some(send_time); - Ok(()) - } - } - - pub fn mark_recv_time( - &mut self, - recv_time: Instant, - receiving_consumer_id: u64, - ) -> Result<(), BenchmarkError> { - if self.receivers_list.contains(&receiving_consumer_id) { - Err(BenchmarkError::ErrorWithExplanation( - "Message received twice by same consumer".to_string(), - )) - } else { - if let Some(first_recv) = self.first_received_time { - if recv_time < first_recv { - self.first_received_time = Some(recv_time) - } - } else { - self.first_received_time = Some(recv_time) - } - if let Some(last_recv) = self.last_received_time { - if recv_time > last_recv { - self.last_received_time = Some(recv_time) - } - } else { - self.last_received_time = Some(recv_time) - } - self.receivers_list.push(receiving_consumer_id); - Ok(()) - } - } - - pub fn validate(&self, expected_num_times_consumed: usize) -> Result<(), BenchmarkError> { - if self.send_time.is_none() { - return Err(BenchmarkError::ErrorWithExplanation( - "Message was never marked as sent".to_string(), - )); - } - if self.receivers_list.len() != expected_num_times_consumed { - let err_message = format!( - "Message was expected to be received {} times but was only received {} times", - expected_num_times_consumed, - self.receivers_list.len() - ); - Err(BenchmarkError::ErrorWithExplanation(err_message)) - } else { - Ok(()) - } - } - pub fn first_recv_latency(&self) -> Duration { - self.first_received_time.expect("Invalid record") - self.send_time.expect("Invalid record") - } - - pub fn last_recv_latency(&self) -> Duration { - self.last_received_time.expect("Invalid record") - self.send_time.expect("Invalid record") - } +pub struct ProduceStat { + pub message_send: AtomicU64, + pub message_bytes: AtomicU64, + pub end: AtomicBool, }