From 0b849a0f9ae806ffb2d94ab5096cc455208e8a54 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Sun, 19 Jan 2025 21:53:01 -0300 Subject: [PATCH] feat: add fluvio producer callback --- .../src/producer_benchmark.rs | 8 ++- .../fluvio-benchmark/src/producer_worker.rs | 46 +++++++++++-- .../fluvio-benchmark/src/stats_collector.rs | 67 +++++++++---------- crates/fluvio/src/lib.rs | 7 +- crates/fluvio/src/producer/accumulator.rs | 19 +++++- crates/fluvio/src/producer/config.rs | 8 ++- crates/fluvio/src/producer/mod.rs | 8 +++ .../fluvio/src/producer/partition_producer.rs | 24 +++++-- crates/fluvio/src/producer/record.rs | 11 +-- 9 files changed, 138 insertions(+), 60 deletions(-) diff --git a/crates/fluvio-benchmark/src/producer_benchmark.rs b/crates/fluvio-benchmark/src/producer_benchmark.rs index e7a0df0568..d3fbafb463 100644 --- a/crates/fluvio-benchmark/src/producer_benchmark.rs +++ b/crates/fluvio-benchmark/src/producer_benchmark.rs @@ -52,17 +52,21 @@ impl ProducerBenchmark { let (stat_sender, stat_receiver) = unbounded(); let (latency_sender, latency_receiver) = unbounded(); + // Set up producers for producer_id in 0..config.num_producers { + let (event_sender, event_receiver) = unbounded(); println!("starting up producer {}", producer_id); let stat_collector = StatCollector::create( config.batch_size.as_u64(), - config.num_records, latency_sender.clone(), stat_sender.clone(), + event_receiver, ); let (tx_control, rx_control) = unbounded(); - let worker = ProducerWorker::new(producer_id, config.clone(), stat_collector).await?; + let worker = + ProducerWorker::new(producer_id, config.clone(), stat_collector, event_sender) + .await?; let jh = spawn(timeout( config.worker_timeout, ProducerDriver::main_loop(rx_control, worker), diff --git a/crates/fluvio-benchmark/src/producer_worker.rs b/crates/fluvio-benchmark/src/producer_worker.rs index 0a0b1953d7..07cf92f081 100644 --- a/crates/fluvio-benchmark/src/producer_worker.rs +++ b/crates/fluvio-benchmark/src/producer_worker.rs @@ -1,9 +1,14 @@ +use std::sync::Arc; + use anyhow::Result; +use async_channel::Sender; use fluvio::{ - dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, RecordKey, - TopicProducerConfigBuilder, TopicProducerPool, + dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, ProduceCompletionEvent, + ProducerCallback, SharedProducerCallback, RecordKey, TopicProducerConfigBuilder, + TopicProducerPool, }; +use futures_util::future::BoxFuture; use crate::{ config::{ProducerConfig, RecordKeyAllocationStrategy}, @@ -13,16 +18,45 @@ use crate::{ const SHARED_KEY: &str = "shared_key"; +// Example implementation of the ProducerCallback trait +#[derive(Debug)] +struct BenchmarkProducerCallback { + event_sender: Sender, +} + +impl BenchmarkProducerCallback { + pub fn new(event_sender: Sender) -> Self { + Self { event_sender } + } +} + +impl ProducerCallback for BenchmarkProducerCallback { + fn finished(&self, event: ProduceCompletionEvent) -> BoxFuture<'_, anyhow::Result<()>> { + Box::pin(async { + self.event_sender.send(event).await?; + Ok(()) + }) + } +} + pub(crate) struct ProducerWorker { fluvio_producer: TopicProducerPool, records_to_send: Vec, stat: StatCollector, } impl ProducerWorker { - pub(crate) async fn new(id: u64, config: ProducerConfig, stat: StatCollector) -> Result { + pub(crate) async fn new( + id: u64, + config: ProducerConfig, + stat: StatCollector, + event_sender: Sender, + ) -> Result { let fluvio = Fluvio::connect().await?; + let callback: SharedProducerCallback = + Arc::new(BenchmarkProducerCallback::new(event_sender)); let fluvio_config = TopicProducerConfigBuilder::default() + .callback(callback) .batch_size(config.batch_size.as_u64() as usize) .batch_queue_size(config.queue_size as usize) .max_request_size(config.max_request_size.as_u64() as usize) @@ -61,13 +95,13 @@ impl ProducerWorker { for record in self.records_to_send.into_iter() { self.stat.start(); - let time = std::time::Instant::now(); - let send_out = self + //let time = std::time::Instant::now(); + let _ = self .fluvio_producer .send(record.key, record.data.clone()) .await?; - self.stat.send_out((send_out, time)); + //self.stat.send_out((send_out, time)); self.stat.add_record(record.data.len() as u64).await; } self.fluvio_producer.flush().await?; diff --git a/crates/fluvio-benchmark/src/stats_collector.rs b/crates/fluvio-benchmark/src/stats_collector.rs index a4958a2d58..348d7e520f 100644 --- a/crates/fluvio-benchmark/src/stats_collector.rs +++ b/crates/fluvio-benchmark/src/stats_collector.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Instant}; -use async_channel::Sender; -use fluvio::ProduceOutput; +use async_channel::{Receiver, Sender}; +use fluvio::ProduceCompletionEvent; use fluvio_future::{sync::Mutex, task::spawn}; use hdrhistogram::Histogram; @@ -10,58 +10,60 @@ pub(crate) struct ProducerStat { record_send: u64, record_bytes: u64, start_time: Instant, - output_tx: Sender<(ProduceOutput, Instant)>, histogram: Arc>>, } impl ProducerStat { - pub(crate) fn new(num_records: u64, latency_sender: Sender>) -> Self { - let (output_tx, rx) = async_channel::unbounded::<(ProduceOutput, Instant)>(); + pub(crate) fn new( + latency_sender: Sender>, + event_receiver: Receiver, + ) -> Self { let histogram = Arc::new(Mutex::new(hdrhistogram::Histogram::::new(2).unwrap())); - ProducerStat::track_latency(num_records, latency_sender, rx, histogram.clone()); + ProducerStat::track_latency(latency_sender, histogram.clone(), event_receiver); Self { record_send: 0, record_bytes: 0, start_time: Instant::now(), - output_tx, histogram, } } fn track_latency( - num_records: u64, latency_sender: Sender>, - rx: async_channel::Receiver<(ProduceOutput, Instant)>, histogram: Arc>>, + event_receiver: Receiver, ) { spawn(async move { - while let Ok((send_out, time)) = rx.recv().await { + loop { let hist = histogram.clone(); - let latency_sender = latency_sender.clone(); - //spawn(async move { - match send_out.wait().await { - Ok(_) => { - let duration = time.elapsed(); - let mut hist = hist.lock().await; - hist.record(duration.as_nanos() as u64).expect("record"); - - if hist.len() >= num_records { - latency_sender.send(hist.clone()).await.expect("send"); + match event_receiver.recv().await { + Ok(event) => { + let base = event.metadata.base_offset().await; + match base { + Ok(_base_offset) => { + let elapsed = event.created_at.elapsed(); + let mut hist = hist.lock().await; + hist.record(elapsed.as_nanos() as u64).expect("record"); + } + Err(err) => { + println!("received err: {:?}", err); + } } } - Err(err) => { - println!("error sending record: {}", err); - return; + //if closed send latency + Err(_e) => { + let hist = hist.lock().await; + latency_sender.send(hist.clone()).await.expect("send"); + break; } } - //}); } }); } - pub(crate) fn calcuate(&mut self) -> Stat { + pub(crate) fn calculate(&mut self) -> Stat { let elapse = self.start_time.elapsed().as_millis(); let records_per_sec = ((self.record_send as f64 / elapse as f64) * 1000.0).round(); let bytes_per_sec = (self.record_bytes as f64 / elapse as f64) * 1000.0; @@ -84,10 +86,6 @@ impl ProducerStat { pub(crate) fn set_current_time(&mut self) { self.start_time = Instant::now(); } - - pub(crate) fn send_out(&mut self, out: (ProduceOutput, Instant)) { - self.output_tx.try_send(out).expect("send out"); - } } pub(crate) struct Stat { @@ -105,17 +103,18 @@ pub(crate) struct StatCollector { batch_size: u64, // number of records before we calculate stats current_record: u64, // how many records we have sent in current cycle sender: Sender, + //event_receiver: Receiver, } impl StatCollector { pub(crate) fn create( batch_size: u64, - num_records: u64, latency_sender: Sender>, sender: Sender, + event_receiver: Receiver, ) -> Self { Self { - current: ProducerStat::new(num_records, latency_sender), + current: ProducerStat::new(latency_sender, event_receiver), batch_size, current_record: 0, sender, @@ -128,17 +127,13 @@ impl StatCollector { } } - pub(crate) fn send_out(&mut self, out: (ProduceOutput, Instant)) { - self.current.send_out(out); - } - pub(crate) async fn add_record(&mut self, bytes: u64) { self.current.record_send += 1; self.current.record_bytes += bytes; self.current_record += 1; if self.current_record >= self.batch_size { - let stat = self.current.calcuate(); + let stat = self.current.calculate(); self.current_record = 0; self.current.record_bytes = 0; self.current.record_send = 0; diff --git a/crates/fluvio/src/lib.rs b/crates/fluvio/src/lib.rs index be77bb9d34..83cd54cd88 100644 --- a/crates/fluvio/src/lib.rs +++ b/crates/fluvio/src/lib.rs @@ -19,9 +19,10 @@ pub mod spu; pub use error::FluvioError; pub use config::FluvioConfig; pub use producer::{ - TopicProducerConfigBuilder, TopicProducerConfig, TopicProducer, TopicProducerPool, RecordKey, - ProduceOutput, FutureRecordMetadata, RecordMetadata, DeliverySemantic, RetryPolicy, - RetryStrategy, Partitioner, PartitionerConfig, ProducerError, + ProducerCallback, SharedProducerCallback, ProduceCompletionEvent, TopicProducerConfigBuilder, + TopicProducerConfig, TopicProducer, TopicProducerPool, RecordKey, ProduceOutput, + FutureRecordMetadata, RecordMetadata, DeliverySemantic, RetryPolicy, RetryStrategy, + Partitioner, PartitionerConfig, ProducerError, }; #[cfg(feature = "smartengine")] pub use producer::{SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData}; diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index 537910ad3a..46d1c42005 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -3,7 +3,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_channel::Sender; use async_lock::RwLock; @@ -218,6 +218,16 @@ impl RecordAccumulator { } } +pub struct ProduceCompletionEvent { + pub created_at: Instant, + pub metadata: Arc, +} + +pub type SharedProducerCallback = Arc + Send + Sync>; +pub trait ProducerCallback { + fn finished(&self, item: T) -> BoxFuture<'_, anyhow::Result<()>>; +} + pub(crate) struct PushRecord { pub(crate) future: FutureRecordMetadata, } @@ -276,6 +286,10 @@ impl ProducerBatch { pub(crate) fn batch(self) -> Batch { self.batch.into() } + + pub(crate) fn metadata(&self) -> Arc { + self.batch_metadata.clone() + } } pub(crate) struct BatchEvents { @@ -318,7 +332,8 @@ type ProduceResponseFuture = Shared>, } diff --git a/crates/fluvio/src/producer/config.rs b/crates/fluvio/src/producer/config.rs index 0fa1849d93..aae4390a3d 100644 --- a/crates/fluvio/src/producer/config.rs +++ b/crates/fluvio/src/producer/config.rs @@ -14,7 +14,9 @@ use serde::{Serialize, Deserialize}; use crate::producer::partitioning::{Partitioner, SiphashRoundRobinPartitioner}; +use super::accumulator::SharedProducerCallback; use super::partitioning::SpecificPartitioner; +use super::ProduceCompletionEvent; const DEFAULT_LINGER_MS: u64 = 100; const DEFAULT_TIMEOUT_MS: u64 = 1500; @@ -70,7 +72,7 @@ impl fmt::Debug for Box { /// Create this struct with [`TopicProducerConfigBuilder`]. /// /// Create a producer with a custom config with [`crate::Fluvio::topic_producer_with_config()`]. -#[derive(Debug, Builder)] +#[derive(Builder)] #[builder(pattern = "owned")] pub struct TopicProducerConfig { /// Maximum amount of bytes accumulated by the records before sending the batch. @@ -118,6 +120,9 @@ pub struct TopicProducerConfig { #[builder(default)] pub(crate) smartmodules: Vec, + + #[builder(setter(into, strip_option), default)] + pub(crate) callback: Option>, } impl TopicProducerConfigBuilder { @@ -177,6 +182,7 @@ impl Default for TopicProducerConfig { isolation: default_isolation(), delivery_semantic: default_delivery(), smartmodules: vec![], + callback: None, } } } diff --git a/crates/fluvio/src/producer/mod.rs b/crates/fluvio/src/producer/mod.rs index de597693f2..27f3b6ba32 100644 --- a/crates/fluvio/src/producer/mod.rs +++ b/crates/fluvio/src/producer/mod.rs @@ -45,6 +45,9 @@ pub use crate::producer::partitioning::{Partitioner, PartitionerConfig}; use self::accumulator::BatchEvents; use self::accumulator::BatchHandler; use self::accumulator::BatchesDeque; +pub use self::accumulator::SharedProducerCallback; +pub use self::accumulator::ProducerCallback; +pub use self::accumulator::ProduceCompletionEvent; pub use self::config::{ TopicProducerConfigBuilder, TopicProducerConfig, TopicProducerConfigBuilderError, DeliverySemantic, RetryPolicy, RetryStrategy, @@ -75,6 +78,7 @@ where batches_deque: Arc, batch_events: Arc, client_metric: Arc, + callback: Option>, } impl ProducerPool { @@ -84,6 +88,7 @@ impl ProducerPool { spu_pool: Arc, batches: Arc>, client_metric: Arc, + callback: Option>, ) -> Self where S: SpuPool + Send + Sync + 'static, @@ -103,6 +108,7 @@ impl ProducerPool { batches_deque: batch_list.clone(), batch_events: batch_events.clone(), client_metric: client_metric.clone(), + callback: callback.clone(), }; PartitionProducer::start( @@ -271,6 +277,7 @@ where batches_deque: BatchesDeque::shared(), batch_events: BatchEvents::shared(), client_metric: self.metrics.clone(), + callback: self.config.callback.clone(), }; let _ = producer_pool @@ -433,6 +440,7 @@ where spu_pool.clone(), Arc::new(record_accumulator.batches().await), metrics.clone(), + config.callback.clone(), ); Ok(Self { diff --git a/crates/fluvio/src/producer/partition_producer.rs b/crates/fluvio/src/producer/partition_producer.rs index fab984842a..a55bdc7ef2 100644 --- a/crates/fluvio/src/producer/partition_producer.rs +++ b/crates/fluvio/src/producer/partition_producer.rs @@ -18,7 +18,7 @@ use fluvio_socket::VersionedSerialSocket; use crate::spu::SpuPool; use crate::TopicProducerConfig; -use super::{PartitionProducerParams, ProducerError}; +use super::{PartitionProducerParams, ProduceCompletionEvent, SharedProducerCallback, ProducerError}; use super::accumulator::{BatchEvents, BatchesDeque}; use super::event::EventHandler; @@ -34,6 +34,7 @@ where batch_events: Arc, last_error: Arc>>, metrics: Arc, + callback: Option>, } impl PartitionProducer @@ -53,6 +54,7 @@ where batch_events: params.batch_events, last_error, metrics: params.client_metric, + callback: params.callback, } } @@ -189,7 +191,7 @@ where ..Default::default() }; - let mut batch_notifiers = vec![]; + let mut batches_notify_and_metadata = vec![]; for p_batch in batches_ready { let mut partition_request = DefaultPartitionRequest { @@ -197,6 +199,7 @@ where ..Default::default() }; let notify = p_batch.notify.clone(); + let metadata = p_batch.metadata().clone(); let batch = p_batch.batch(); let raw_batch: Batch = batch.try_into()?; @@ -206,7 +209,7 @@ where producer_metrics.add_bytes(raw_batch.batch_len() as u64); partition_request.records.batches.push(raw_batch); - batch_notifiers.push(notify); + batches_notify_and_metadata.push((notify, metadata)); topic_request.partitions.push(partition_request); } @@ -217,12 +220,21 @@ where let (response, _) = self.send_to_socket(spu_socket, request).await?; - for (batch_notifier, partition_response_fut) in - batch_notifiers.into_iter().zip(response.into_iter()) + for ((notify, batch_metadata), partition_response_fut) in batches_notify_and_metadata + .into_iter() + .zip(response.into_iter()) { - if let Err(_e) = batch_notifier.send(partition_response_fut).await { + if let Err(_e) = notify.send(partition_response_fut.clone()).await { trace!("Failed to notify produce result because receiver was dropped"); } + + if let Some(callback) = self.callback.clone() { + let event = ProduceCompletionEvent { + created_at: batch_metadata.created_at, + metadata: batch_metadata.clone(), + }; + callback.finished(event).await.unwrap(); + } } Ok(()) diff --git a/crates/fluvio/src/producer/record.rs b/crates/fluvio/src/producer/record.rs index 9c262cf212..e194c7a6d5 100644 --- a/crates/fluvio/src/producer/record.rs +++ b/crates/fluvio/src/producer/record.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Instant; use async_channel::Receiver; use async_lock::RwLock; @@ -34,7 +35,7 @@ impl RecordMetadata { } /// Possible states of a batch in the accumulator -pub(crate) enum BatchMetadataState { +pub enum BatchMetadataState { /// The batch is buffered and ready to be sent to the SPU Buffered(Receiver), /// The batch was sent to the SPU. Base offset is known @@ -43,20 +44,22 @@ pub(crate) enum BatchMetadataState { Failed(ProducerError), } -pub(crate) struct BatchMetadata { - state: RwLock, +pub struct BatchMetadata { + pub state: RwLock, + pub created_at: Instant, } impl BatchMetadata { pub(crate) fn new(receiver: Receiver) -> Self { Self { state: RwLock::new(BatchMetadataState::Buffered(receiver)), + created_at: Instant::now(), } } /// Wait for the base offset of the batch. This is the offset of the first /// record in the batch and it is known once the batch is sent to the server. - pub(crate) async fn base_offset(&self) -> Result { + pub async fn base_offset(&self) -> Result { let mut state = self.state.write().await; match &*state { BatchMetadataState::Buffered(receiver) => {