Skip to content

Commit

Permalink
feat: add fluvio producer callback
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Jan 20, 2025
1 parent 5aad19d commit 0b849a0
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 60 deletions.
8 changes: 6 additions & 2 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,21 @@ impl ProducerBenchmark {

let (stat_sender, stat_receiver) = unbounded();
let (latency_sender, latency_receiver) = unbounded();

// Set up producers
for producer_id in 0..config.num_producers {
let (event_sender, event_receiver) = unbounded();
println!("starting up producer {}", producer_id);
let stat_collector = StatCollector::create(
config.batch_size.as_u64(),
config.num_records,
latency_sender.clone(),
stat_sender.clone(),
event_receiver,
);
let (tx_control, rx_control) = unbounded();
let worker = ProducerWorker::new(producer_id, config.clone(), stat_collector).await?;
let worker =
ProducerWorker::new(producer_id, config.clone(), stat_collector, event_sender)
.await?;
let jh = spawn(timeout(
config.worker_timeout,
ProducerDriver::main_loop(rx_control, worker),
Expand Down
46 changes: 40 additions & 6 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use std::sync::Arc;

use anyhow::Result;

use async_channel::Sender;
use fluvio::{
dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, RecordKey,
TopicProducerConfigBuilder, TopicProducerPool,
dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, ProduceCompletionEvent,
ProducerCallback, SharedProducerCallback, RecordKey, TopicProducerConfigBuilder,
TopicProducerPool,
};
use futures_util::future::BoxFuture;

use crate::{
config::{ProducerConfig, RecordKeyAllocationStrategy},
Expand All @@ -13,16 +18,45 @@ use crate::{

const SHARED_KEY: &str = "shared_key";

// Example implementation of the ProducerCallback trait
#[derive(Debug)]
struct BenchmarkProducerCallback {
event_sender: Sender<ProduceCompletionEvent>,
}

impl BenchmarkProducerCallback {
pub fn new(event_sender: Sender<ProduceCompletionEvent>) -> Self {
Self { event_sender }
}
}

impl ProducerCallback<ProduceCompletionEvent> for BenchmarkProducerCallback {
fn finished(&self, event: ProduceCompletionEvent) -> BoxFuture<'_, anyhow::Result<()>> {
Box::pin(async {
self.event_sender.send(event).await?;
Ok(())
})
}
}

pub(crate) struct ProducerWorker {
fluvio_producer: TopicProducerPool,
records_to_send: Vec<BenchmarkRecord>,
stat: StatCollector,
}
impl ProducerWorker {
pub(crate) async fn new(id: u64, config: ProducerConfig, stat: StatCollector) -> Result<Self> {
pub(crate) async fn new(
id: u64,
config: ProducerConfig,
stat: StatCollector,
event_sender: Sender<ProduceCompletionEvent>,
) -> Result<Self> {
let fluvio = Fluvio::connect().await?;
let callback: SharedProducerCallback<ProduceCompletionEvent> =
Arc::new(BenchmarkProducerCallback::new(event_sender));

let fluvio_config = TopicProducerConfigBuilder::default()
.callback(callback)
.batch_size(config.batch_size.as_u64() as usize)
.batch_queue_size(config.queue_size as usize)
.max_request_size(config.max_request_size.as_u64() as usize)
Expand Down Expand Up @@ -61,13 +95,13 @@ impl ProducerWorker {

for record in self.records_to_send.into_iter() {
self.stat.start();
let time = std::time::Instant::now();
let send_out = self
//let time = std::time::Instant::now();
let _ = self
.fluvio_producer
.send(record.key, record.data.clone())
.await?;

self.stat.send_out((send_out, time));
//self.stat.send_out((send_out, time));
self.stat.add_record(record.data.len() as u64).await;
}
self.fluvio_producer.flush().await?;
Expand Down
67 changes: 31 additions & 36 deletions crates/fluvio-benchmark/src/stats_collector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Instant};

use async_channel::Sender;
use fluvio::ProduceOutput;
use async_channel::{Receiver, Sender};
use fluvio::ProduceCompletionEvent;
use fluvio_future::{sync::Mutex, task::spawn};
use hdrhistogram::Histogram;

Expand All @@ -10,58 +10,60 @@ pub(crate) struct ProducerStat {
record_send: u64,
record_bytes: u64,
start_time: Instant,
output_tx: Sender<(ProduceOutput, Instant)>,
histogram: Arc<Mutex<Histogram<u64>>>,
}

impl ProducerStat {
pub(crate) fn new(num_records: u64, latency_sender: Sender<Histogram<u64>>) -> Self {
let (output_tx, rx) = async_channel::unbounded::<(ProduceOutput, Instant)>();
pub(crate) fn new(
latency_sender: Sender<Histogram<u64>>,
event_receiver: Receiver<ProduceCompletionEvent>,
) -> Self {
let histogram = Arc::new(Mutex::new(hdrhistogram::Histogram::<u64>::new(2).unwrap()));

ProducerStat::track_latency(num_records, latency_sender, rx, histogram.clone());
ProducerStat::track_latency(latency_sender, histogram.clone(), event_receiver);

Self {
record_send: 0,
record_bytes: 0,
start_time: Instant::now(),
output_tx,
histogram,
}
}

fn track_latency(
num_records: u64,
latency_sender: Sender<Histogram<u64>>,
rx: async_channel::Receiver<(ProduceOutput, Instant)>,
histogram: Arc<Mutex<Histogram<u64>>>,
event_receiver: Receiver<ProduceCompletionEvent>,
) {
spawn(async move {
while let Ok((send_out, time)) = rx.recv().await {
loop {
let hist = histogram.clone();
let latency_sender = latency_sender.clone();
//spawn(async move {
match send_out.wait().await {
Ok(_) => {
let duration = time.elapsed();
let mut hist = hist.lock().await;
hist.record(duration.as_nanos() as u64).expect("record");

if hist.len() >= num_records {
latency_sender.send(hist.clone()).await.expect("send");
match event_receiver.recv().await {
Ok(event) => {
let base = event.metadata.base_offset().await;
match base {
Ok(_base_offset) => {
let elapsed = event.created_at.elapsed();
let mut hist = hist.lock().await;
hist.record(elapsed.as_nanos() as u64).expect("record");
}
Err(err) => {
println!("received err: {:?}", err);
}
}
}
Err(err) => {
println!("error sending record: {}", err);
return;
//if closed send latency
Err(_e) => {
let hist = hist.lock().await;
latency_sender.send(hist.clone()).await.expect("send");
break;
}
}
//});
}
});
}

pub(crate) fn calcuate(&mut self) -> Stat {
pub(crate) fn calculate(&mut self) -> Stat {
let elapse = self.start_time.elapsed().as_millis();
let records_per_sec = ((self.record_send as f64 / elapse as f64) * 1000.0).round();
let bytes_per_sec = (self.record_bytes as f64 / elapse as f64) * 1000.0;
Expand All @@ -84,10 +86,6 @@ impl ProducerStat {
pub(crate) fn set_current_time(&mut self) {
self.start_time = Instant::now();
}

pub(crate) fn send_out(&mut self, out: (ProduceOutput, Instant)) {
self.output_tx.try_send(out).expect("send out");
}
}

pub(crate) struct Stat {
Expand All @@ -105,17 +103,18 @@ pub(crate) struct StatCollector {
batch_size: u64, // number of records before we calculate stats
current_record: u64, // how many records we have sent in current cycle
sender: Sender<Stat>,
//event_receiver: Receiver<ProduceCompletionEvent>,
}

impl StatCollector {
pub(crate) fn create(
batch_size: u64,
num_records: u64,
latency_sender: Sender<Histogram<u64>>,
sender: Sender<Stat>,
event_receiver: Receiver<ProduceCompletionEvent>,
) -> Self {
Self {
current: ProducerStat::new(num_records, latency_sender),
current: ProducerStat::new(latency_sender, event_receiver),
batch_size,
current_record: 0,
sender,
Expand All @@ -128,17 +127,13 @@ impl StatCollector {
}
}

pub(crate) fn send_out(&mut self, out: (ProduceOutput, Instant)) {
self.current.send_out(out);
}

pub(crate) async fn add_record(&mut self, bytes: u64) {
self.current.record_send += 1;
self.current.record_bytes += bytes;
self.current_record += 1;

if self.current_record >= self.batch_size {
let stat = self.current.calcuate();
let stat = self.current.calculate();
self.current_record = 0;
self.current.record_bytes = 0;
self.current.record_send = 0;
Expand Down
7 changes: 4 additions & 3 deletions crates/fluvio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ pub mod spu;
pub use error::FluvioError;
pub use config::FluvioConfig;
pub use producer::{
TopicProducerConfigBuilder, TopicProducerConfig, TopicProducer, TopicProducerPool, RecordKey,
ProduceOutput, FutureRecordMetadata, RecordMetadata, DeliverySemantic, RetryPolicy,
RetryStrategy, Partitioner, PartitionerConfig, ProducerError,
ProducerCallback, SharedProducerCallback, ProduceCompletionEvent, TopicProducerConfigBuilder,
TopicProducerConfig, TopicProducer, TopicProducerPool, RecordKey, ProduceOutput,
FutureRecordMetadata, RecordMetadata, DeliverySemantic, RetryPolicy, RetryStrategy,
Partitioner, PartitionerConfig, ProducerError,
};
#[cfg(feature = "smartengine")]
pub use producer::{SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData};
Expand Down
19 changes: 17 additions & 2 deletions crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::{Duration, Instant};

use async_channel::Sender;
use async_lock::RwLock;
Expand Down Expand Up @@ -218,6 +218,16 @@ impl RecordAccumulator {
}
}

pub struct ProduceCompletionEvent {
pub created_at: Instant,
pub metadata: Arc<BatchMetadata>,
}

pub type SharedProducerCallback<T> = Arc<dyn ProducerCallback<T> + Send + Sync>;
pub trait ProducerCallback<T> {
fn finished(&self, item: T) -> BoxFuture<'_, anyhow::Result<()>>;
}

pub(crate) struct PushRecord {
pub(crate) future: FutureRecordMetadata,
}
Expand Down Expand Up @@ -276,6 +286,10 @@ impl ProducerBatch {
pub(crate) fn batch(self) -> Batch {
self.batch.into()
}

pub(crate) fn metadata(&self) -> Arc<BatchMetadata> {
self.batch_metadata.clone()
}
}

pub(crate) struct BatchEvents {
Expand Down Expand Up @@ -318,7 +332,8 @@ type ProduceResponseFuture = Shared<BoxFuture<'static, Arc<Result<ProduceRespons

/// A Future that resolves to pair `base_offset` and `error_code`, which effectively come from
/// [`PartitionProduceResponse`].
pub(crate) struct ProducePartitionResponseFuture {
#[derive(Debug, Clone)]
pub struct ProducePartitionResponseFuture {
inner: Either<(ProduceResponseFuture, usize), Option<(Offset, ErrorCode)>>,
}

Expand Down
8 changes: 7 additions & 1 deletion crates/fluvio/src/producer/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use serde::{Serialize, Deserialize};

use crate::producer::partitioning::{Partitioner, SiphashRoundRobinPartitioner};

use super::accumulator::SharedProducerCallback;
use super::partitioning::SpecificPartitioner;
use super::ProduceCompletionEvent;

const DEFAULT_LINGER_MS: u64 = 100;
const DEFAULT_TIMEOUT_MS: u64 = 1500;
Expand Down Expand Up @@ -70,7 +72,7 @@ impl fmt::Debug for Box<dyn Partitioner + Send + Sync> {
/// Create this struct with [`TopicProducerConfigBuilder`].
///
/// Create a producer with a custom config with [`crate::Fluvio::topic_producer_with_config()`].
#[derive(Debug, Builder)]
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct TopicProducerConfig {
/// Maximum amount of bytes accumulated by the records before sending the batch.
Expand Down Expand Up @@ -118,6 +120,9 @@ pub struct TopicProducerConfig {

#[builder(default)]
pub(crate) smartmodules: Vec<SmartModuleInvocation>,

#[builder(setter(into, strip_option), default)]
pub(crate) callback: Option<SharedProducerCallback<ProduceCompletionEvent>>,
}

impl TopicProducerConfigBuilder {
Expand Down Expand Up @@ -177,6 +182,7 @@ impl Default for TopicProducerConfig {
isolation: default_isolation(),
delivery_semantic: default_delivery(),
smartmodules: vec![],
callback: None,
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions crates/fluvio/src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub use crate::producer::partitioning::{Partitioner, PartitionerConfig};
use self::accumulator::BatchEvents;
use self::accumulator::BatchHandler;
use self::accumulator::BatchesDeque;
pub use self::accumulator::SharedProducerCallback;
pub use self::accumulator::ProducerCallback;
pub use self::accumulator::ProduceCompletionEvent;
pub use self::config::{
TopicProducerConfigBuilder, TopicProducerConfig, TopicProducerConfigBuilderError,
DeliverySemantic, RetryPolicy, RetryStrategy,
Expand Down Expand Up @@ -75,6 +78,7 @@ where
batches_deque: Arc<BatchesDeque>,
batch_events: Arc<BatchEvents>,
client_metric: Arc<ClientMetrics>,
callback: Option<SharedProducerCallback<ProduceCompletionEvent>>,
}

impl ProducerPool {
Expand All @@ -84,6 +88,7 @@ impl ProducerPool {
spu_pool: Arc<S>,
batches: Arc<HashMap<PartitionId, BatchHandler>>,
client_metric: Arc<ClientMetrics>,
callback: Option<SharedProducerCallback<ProduceCompletionEvent>>,
) -> Self
where
S: SpuPool + Send + Sync + 'static,
Expand All @@ -103,6 +108,7 @@ impl ProducerPool {
batches_deque: batch_list.clone(),
batch_events: batch_events.clone(),
client_metric: client_metric.clone(),
callback: callback.clone(),
};

PartitionProducer::start(
Expand Down Expand Up @@ -271,6 +277,7 @@ where
batches_deque: BatchesDeque::shared(),
batch_events: BatchEvents::shared(),
client_metric: self.metrics.clone(),
callback: self.config.callback.clone(),
};

let _ = producer_pool
Expand Down Expand Up @@ -433,6 +440,7 @@ where
spu_pool.clone(),
Arc::new(record_accumulator.batches().await),
metrics.clone(),
config.callback.clone(),
);

Ok(Self {
Expand Down
Loading

0 comments on commit 0b849a0

Please sign in to comment.