Skip to content

Commit

Permalink
feat: add fluvio producer callback
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Jan 20, 2025
1 parent 5aad19d commit 21775b3
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 170 deletions.
55 changes: 29 additions & 26 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,19 @@ impl ProducerBenchmark {
let mut workers_jh = Vec::new();

let (stat_sender, stat_receiver) = unbounded();
let (latency_sender, latency_receiver) = unbounded();
let (end_sender, end_receiver) = unbounded();

// Set up producers
for producer_id in 0..config.num_producers {
let (event_sender, event_receiver) = unbounded();
println!("starting up producer {}", producer_id);
let stat_collector = StatCollector::create(
config.batch_size.as_u64(),
config.num_records,
latency_sender.clone(),
stat_sender.clone(),
);
let stat_collector =
StatCollector::create(end_sender.clone(), stat_sender.clone(), event_receiver);
let (tx_control, rx_control) = unbounded();
let worker = ProducerWorker::new(producer_id, config.clone(), stat_collector).await?;
//sleep(Duration::from_secs(1)).await;
let worker =
ProducerWorker::new(producer_id, config.clone(), stat_collector, event_sender)
.await?;
let jh = spawn(timeout(
config.worker_timeout,
ProducerDriver::main_loop(rx_control, worker),
Expand All @@ -76,35 +77,37 @@ impl ProducerBenchmark {

loop {
select! {
hist = latency_receiver.recv() => {
if let Ok(hist) = hist {
stat_rx = stat_receiver.recv() => {
if let Ok(stat) = stat_rx {
let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string();
println!(
"{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency",
stat.record_send, stat.records_per_sec, human_readable_bytes,
utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max)
);
}
}
end = end_receiver.recv() => {
if let Ok(end) = end {
let mut latency_yaml = String::new();
latency_yaml.push_str(&format!("{:.2}ms avg latency, {:.2}ms max latency",
utils::nanos_to_ms_pritable(hist.mean() as u64),
utils::nanos_to_ms_pritable(hist.value_at_quantile(1.0))));
utils::nanos_to_ms_pritable(end.histogram.mean() as u64),
utils::nanos_to_ms_pritable(end.histogram.value_at_quantile(1.0))));
for percentile in [0.5, 0.95, 0.99] {
latency_yaml.push_str(&format!(
", {:.2}ms p{percentile:4.2}",
utils::nanos_to_ms_pritable(hist.value_at_quantile(percentile)),
utils::nanos_to_ms_pritable(end.histogram.value_at_quantile(percentile)),
));
}
println!("{}", latency_yaml);
}
break;
}
stat_rx = stat_receiver.recv() => {
if let Ok(stat) = stat_rx {
// lantecy_receiver is finishing the benchmark now
//if stat.end {
// break;
//}
let human_readable_bytes = ByteSize(stat.bytes_per_sec as u64).to_string();

let human_readable_bytes = ByteSize(end.bytes_per_sec).to_string();
println!(
"{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency",
stat.total_records_send, stat.records_per_sec, human_readable_bytes,
utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max)
"{} records sent, {} records/sec: ({}/sec) ",
end.total_records, end.records_per_sec, human_readable_bytes
);
}
break;
}
}
}
Expand Down
55 changes: 39 additions & 16 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use std::sync::Arc;

use anyhow::Result;

use async_channel::Sender;
use fluvio::{
dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, RecordKey,
TopicProducerConfigBuilder, TopicProducerPool,
dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, ProduceCompletionEvent,
ProducerCallback, SharedProducerCallback, RecordKey, TopicProducerConfigBuilder,
TopicProducerPool,
};
use futures_util::future::BoxFuture;

use crate::{
config::{ProducerConfig, RecordKeyAllocationStrategy},
Expand All @@ -13,16 +18,45 @@ use crate::{

const SHARED_KEY: &str = "shared_key";

// Example implementation of the ProducerCallback trait
#[derive(Debug)]
struct BenchmarkProducerCallback {
event_sender: Sender<ProduceCompletionEvent>,
}

impl BenchmarkProducerCallback {
pub fn new(event_sender: Sender<ProduceCompletionEvent>) -> Self {
Self { event_sender }
}
}

impl ProducerCallback<ProduceCompletionEvent> for BenchmarkProducerCallback {
fn finished(&self, event: ProduceCompletionEvent) -> BoxFuture<'_, anyhow::Result<()>> {
Box::pin(async {
self.event_sender.send(event).await?;
Ok(())
})
}
}

pub(crate) struct ProducerWorker {
fluvio_producer: TopicProducerPool,
records_to_send: Vec<BenchmarkRecord>,
stat: StatCollector,
}
impl ProducerWorker {
pub(crate) async fn new(id: u64, config: ProducerConfig, stat: StatCollector) -> Result<Self> {
pub(crate) async fn new(
id: u64,
config: ProducerConfig,
stat: StatCollector,
event_sender: Sender<ProduceCompletionEvent>,
) -> Result<Self> {
let fluvio = Fluvio::connect().await?;
let callback: SharedProducerCallback<ProduceCompletionEvent> =
Arc::new(BenchmarkProducerCallback::new(event_sender));

let fluvio_config = TopicProducerConfigBuilder::default()
.callback(callback)
.batch_size(config.batch_size.as_u64() as usize)
.batch_queue_size(config.queue_size as usize)
.max_request_size(config.max_request_size.as_u64() as usize)
Expand All @@ -39,16 +73,8 @@ impl ProducerWorker {

let num_records = records_per_producer(id, config.num_producers, config.num_records);

println!("producer {} will send {} records", id, num_records);

let records_to_send = create_records(config.clone(), num_records, id);

println!(
"producer {} will send {} records",
id,
records_to_send.len()
);

Ok(ProducerWorker {
fluvio_producer,
records_to_send,
Expand All @@ -60,18 +86,15 @@ impl ProducerWorker {
println!("producer is sending batch");

for record in self.records_to_send.into_iter() {
self.stat.start();
let time = std::time::Instant::now();
let send_out = self
self.stat.start().await;
let _ = self
.fluvio_producer
.send(record.key, record.data.clone())
.await?;

self.stat.send_out((send_out, time));
self.stat.add_record(record.data.len() as u64).await;
}
self.fluvio_producer.flush().await?;
self.stat.finish();

Ok(())
}
Expand Down
Loading

0 comments on commit 21775b3

Please sign in to comment.