diff --git a/Cargo.lock b/Cargo.lock index dbd2e763d6..8e04b692b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2328,6 +2328,7 @@ dependencies = [ "fluvio", "fluvio-future", "futures-util", + "hdrhistogram", "humantime", "rand", "rand_xoshiro", @@ -2335,6 +2336,7 @@ dependencies = [ "serde", "serde_yaml", "thiserror 1.0.69", + "tokio", "tracing", ] diff --git a/crates/fluvio-benchmark/Cargo.toml b/crates/fluvio-benchmark/Cargo.toml index fa94c1c1b6..ce16e0f1a4 100644 --- a/crates/fluvio-benchmark/Cargo.toml +++ b/crates/fluvio-benchmark/Cargo.toml @@ -17,8 +17,10 @@ clap = { workspace = true, features = ["std","derive"] } derive_builder = { workspace = true } futures-util = { workspace = true } humantime = { workspace = true } +hdrhistogram = { workspace = true } rand = { workspace = true } rand_xoshiro = { workspace = true } +tokio = { workspace = true, features = ['macros'] } rayon = { workspace = true } serde = { workspace = true , features = ['derive'] } serde_yaml = { workspace = true } diff --git a/crates/fluvio-benchmark/src/config.rs b/crates/fluvio-benchmark/src/config.rs index f9d11d1f57..d3b824e826 100644 --- a/crates/fluvio-benchmark/src/config.rs +++ b/crates/fluvio-benchmark/src/config.rs @@ -7,9 +7,9 @@ use bytesize::ByteSize; use crate::utils; -const DEFAULT_BATCH_SIZE: ByteSize = ByteSize(16_384); +const DEFAULT_BATCH_SIZE: ByteSize = ByteSize::kib(16); const DEFAULT_QUEUE_SIZE: u64 = 10; -const DEFAULT_MAX_REQUEST_SIZE: ByteSize = ByteSize(33_554_432); +const DEFAULT_MAX_REQUEST_SIZE: ByteSize = ByteSize::mib(32); const DEFAULT_LINGER: &str = "0ms"; const DEFAULT_SERVER_TIMEOUT: &str = "5000ms"; const DEFAULT_COMPRESSION: Compression = Compression::None; @@ -19,7 +19,7 @@ const DEFAULT_WORKER_TIMEOUT: &str = "3000s"; const DEFAULT_RECORD_KEY_ALLOCATION_STRATEGY: RecordKeyAllocationStrategy = RecordKeyAllocationStrategy::NoKey; const DEFAULT_NUM_PRODUCERS: u64 = 1; -const DEFAULT_RECORD_SIZE: ByteSize = ByteSize(5120); +const DEFAULT_RECORD_SIZE: ByteSize = ByteSize::kib(5); const DEFAULT_NUM_RECORDS: u64 = 10_000; const DEFAULT_PARTITIONS: u32 = 1; const DEFAULT_REPLICAS: u32 = 1; diff --git a/crates/fluvio-benchmark/src/producer_benchmark.rs b/crates/fluvio-benchmark/src/producer_benchmark.rs index b136f891fd..a77a91c744 100644 --- a/crates/fluvio-benchmark/src/producer_benchmark.rs +++ b/crates/fluvio-benchmark/src/producer_benchmark.rs @@ -1,9 +1,14 @@ use anyhow::Result; use async_channel::{unbounded, Receiver}; +use bytesize::ByteSize; use fluvio_future::{task::spawn, future::timeout, timer::sleep}; use fluvio::{metadata::topic::TopicSpec, FluvioAdmin}; -use crate::{config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector}; +use tokio::select; + +use crate::{ + utils, config::ProducerConfig, producer_worker::ProducerWorker, stats_collector::StatCollector, +}; pub struct ProducerBenchmark {} @@ -49,10 +54,16 @@ impl ProducerBenchmark { let mut workers_jh = Vec::new(); let (stat_sender, stat_receiver) = unbounded(); + let (latency_sender, latency_receiver) = unbounded(); // Set up producers for producer_id in 0..config.shared_config.load_config.num_producers { println!("starting up producer {}", producer_id); - let stat_collector = StatCollector::create(10000, stat_sender.clone()); + let stat_collector = StatCollector::create( + config.batch_size.as_u64(), + config.shared_config.load_config.num_records, + latency_sender.clone(), + stat_sender.clone(), + ); let (tx_control, rx_control) = unbounded(); let worker = ProducerWorker::new(producer_id, config.clone(), stat_collector).await?; let jh = spawn(timeout( @@ -64,20 +75,41 @@ impl ProducerBenchmark { tx_controls.push(tx_control); workers_jh.push(jh); } - println!("benchmark started"); - - // delay 1 seconds, so produce can start - sleep(std::time::Duration::from_secs(1)).await; - - while let Ok(stat) = stat_receiver.recv().await { - if stat.end { - break; + println!("Benchmark started"); + + loop { + select! { + hist = latency_receiver.recv() => { + if let Ok(hist) = hist { + let mut latency_yaml = String::new(); + latency_yaml.push_str(&format!("{:.2}ms avg latency, {:.2}ms max latency", + utils::nanos_to_ms_pritable(hist.mean() as u64), + utils::nanos_to_ms_pritable(hist.value_at_quantile(1.0)))); + for percentile in [0.5, 0.95, 0.99] { + latency_yaml.push_str(&format!( + ", {:.2}ms p{percentile:4.2}", + utils::nanos_to_ms_pritable(hist.value_at_quantile(percentile)), + )); + } + println!("{}", latency_yaml); + } + break; + } + stat_rx = stat_receiver.recv() => { + if let Ok(stat) = stat_rx { + // lantecy_receiver is finishing the benchmark now + //if stat.end { + // break; + //} + let human_readable_bytes = ByteSize(stat.bytes_per_sec as u64).to_string(); + println!( + "{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency", + stat.total_records_send, stat.records_per_sec, human_readable_bytes, + utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max) + ); + } + } } - let human_readable_bytes = format!("{:9.1}mb/s", stat.bytes_per_sec / 1000000.0); - println!( - "total bytes send: {} | total message send: {} | message: per second: {}, bytes per sec: {}, ", - stat.total_bytes_send, stat.total_message_send, stat.message_per_sec,human_readable_bytes - ); } // Wait for all producers to finish diff --git a/crates/fluvio-benchmark/src/producer_worker.rs b/crates/fluvio-benchmark/src/producer_worker.rs index 82d2bf0af5..62b4d2e774 100644 --- a/crates/fluvio-benchmark/src/producer_worker.rs +++ b/crates/fluvio-benchmark/src/producer_worker.rs @@ -68,15 +68,17 @@ impl ProducerWorker { for record in self.records_to_send.into_iter() { self.stat.start(); - self.fluvio_producer + let time = std::time::Instant::now(); + let send_out = self + .fluvio_producer .send(record.key, record.data.clone()) .await?; - self.stat.record_record_send(record.data.len() as u64).await; - } + self.stat.send_out((send_out, time)); + self.stat.add_record(record.data.len() as u64).await; + } self.fluvio_producer.flush().await?; self.stat.finish(); - println!("producer is done sending batch"); Ok(()) } diff --git a/crates/fluvio-benchmark/src/stats_collector.rs b/crates/fluvio-benchmark/src/stats_collector.rs index 279da73b5f..ff8003025b 100644 --- a/crates/fluvio-benchmark/src/stats_collector.rs +++ b/crates/fluvio-benchmark/src/stats_collector.rs @@ -1,48 +1,96 @@ -use std::time::Instant; +use std::{sync::Arc, time::Instant}; use async_channel::Sender; +use fluvio::ProduceOutput; +use fluvio_future::{sync::Mutex, task::spawn}; +use hdrhistogram::Histogram; #[derive(Debug)] pub(crate) struct ProducerStat { record_send: u64, record_bytes: u64, start_time: Instant, + output_tx: Sender<(ProduceOutput, Instant)>, + histogram: Arc>>, } impl ProducerStat { - pub(crate) fn new() -> Self { + pub(crate) fn new(num_records: u64, latency_sender: Sender>) -> Self { + let (output_tx, rx) = async_channel::unbounded::<(ProduceOutput, Instant)>(); + let histogram = Arc::new(Mutex::new(hdrhistogram::Histogram::::new(2).unwrap())); + + ProducerStat::track_latency(num_records, latency_sender, rx, histogram.clone()); + Self { record_send: 0, record_bytes: 0, start_time: Instant::now(), + output_tx, + histogram, } } + fn track_latency( + num_records: u64, + latency_sender: Sender>, + rx: async_channel::Receiver<(ProduceOutput, Instant)>, + histogram: Arc>>, + ) { + spawn(async move { + while let Ok((send_out, time)) = rx.recv().await { + let hist = histogram.clone(); + let latency_sender = latency_sender.clone(); + spawn(async move { + let _o = send_out.wait().await.unwrap(); + let duration = time.elapsed(); + let mut hist = hist.lock().await; + hist.record(duration.as_nanos() as u64).expect("record"); + + if hist.len() >= num_records { + latency_sender.send(hist.clone()).await.expect("send"); + } + }); + } + }); + } + pub(crate) fn calcuate(&mut self) -> Stat { let elapse = self.start_time.elapsed().as_millis(); - let message_per_sec = ((self.record_send as f64 / elapse as f64) * 1000.0).round(); + let records_per_sec = ((self.record_send as f64 / elapse as f64) * 1000.0).round(); let bytes_per_sec = (self.record_bytes as f64 / elapse as f64) * 1000.0; + let hist = self.histogram.lock_blocking(); + let latency_avg = hist.mean() as u64; + let latency_max = hist.value_at_quantile(1.0); + Stat { - message_per_sec, + records_per_sec, bytes_per_sec, - total_bytes_send: self.record_bytes, - total_message_send: self.record_send, - end: false, + _total_bytes_send: self.record_bytes, + total_records_send: self.record_send, + latency_avg, + latency_max, + _end: false, } } pub(crate) fn set_current_time(&mut self) { self.start_time = Instant::now(); } + + pub(crate) fn send_out(&mut self, out: (ProduceOutput, Instant)) { + self.output_tx.try_send(out).expect("send out"); + } } pub(crate) struct Stat { - pub message_per_sec: f64, + pub records_per_sec: f64, pub bytes_per_sec: f64, - pub total_bytes_send: u64, - pub total_message_send: u64, - pub end: bool, + pub _total_bytes_send: u64, + pub total_records_send: u64, + pub latency_avg: u64, + pub latency_max: u64, + pub _end: bool, } pub(crate) struct StatCollector { @@ -53,9 +101,14 @@ pub(crate) struct StatCollector { } impl StatCollector { - pub(crate) fn create(batch_size: u64, sender: Sender) -> Self { + pub(crate) fn create( + batch_size: u64, + num_records: u64, + latency_sender: Sender>, + sender: Sender, + ) -> Self { Self { - current: ProducerStat::new(), + current: ProducerStat::new(num_records, latency_sender), batch_size, current_record: 0, sender, @@ -68,7 +121,11 @@ impl StatCollector { } } - pub(crate) async fn record_record_send(&mut self, bytes: u64) { + pub(crate) fn send_out(&mut self, out: (ProduceOutput, Instant)) { + self.current.send_out(out); + } + + pub(crate) async fn add_record(&mut self, bytes: u64) { self.current.record_send += 1; self.current.record_bytes += bytes; self.current_record += 1; @@ -84,11 +141,13 @@ impl StatCollector { pub(crate) fn finish(&mut self) { let end_record = Stat { - message_per_sec: 0.0, + records_per_sec: 0.0, bytes_per_sec: 0.0, - total_bytes_send: 0, - total_message_send: 0, - end: true, + _total_bytes_send: 0, + total_records_send: 0, + latency_avg: 0, + latency_max: 0, + _end: true, }; self.sender.try_send(end_record).expect("send end stats"); diff --git a/crates/fluvio-benchmark/src/utils.rs b/crates/fluvio-benchmark/src/utils.rs index d7d00ccd7c..abd14e419c 100644 --- a/crates/fluvio-benchmark/src/utils.rs +++ b/crates/fluvio-benchmark/src/utils.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use rand::{distributions::Alphanumeric, Rng}; use rand::{RngCore, SeedableRng}; use rand_xoshiro::Xoshiro256PlusPlus; @@ -44,3 +46,7 @@ pub fn generate_random_string_vec(num: usize, size: usize) -> Vec { random_strings } + +pub fn nanos_to_ms_pritable(nano: u64) -> f64 { + Duration::from_nanos(nano).as_secs_f64() * 1000.0 +}