From 21775b31ad9a5e260259f8f2f749b525147addf9 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Sun, 19 Jan 2025 21:53:01 -0300 Subject: [PATCH] feat: add fluvio producer callback --- .../src/producer_benchmark.rs | 55 ++-- .../fluvio-benchmark/src/producer_worker.rs | 55 ++-- .../fluvio-benchmark/src/stats_collector.rs | 265 ++++++++++-------- crates/fluvio/src/lib.rs | 7 +- crates/fluvio/src/producer/accumulator.rs | 19 +- crates/fluvio/src/producer/config.rs | 8 +- crates/fluvio/src/producer/mod.rs | 8 + .../fluvio/src/producer/partition_producer.rs | 24 +- crates/fluvio/src/producer/record.rs | 11 +- 9 files changed, 282 insertions(+), 170 deletions(-) diff --git a/crates/fluvio-benchmark/src/producer_benchmark.rs b/crates/fluvio-benchmark/src/producer_benchmark.rs index e7a0df0568..e5a9d92757 100644 --- a/crates/fluvio-benchmark/src/producer_benchmark.rs +++ b/crates/fluvio-benchmark/src/producer_benchmark.rs @@ -51,18 +51,19 @@ impl ProducerBenchmark { let mut workers_jh = Vec::new(); let (stat_sender, stat_receiver) = unbounded(); - let (latency_sender, latency_receiver) = unbounded(); + let (end_sender, end_receiver) = unbounded(); + // Set up producers for producer_id in 0..config.num_producers { + let (event_sender, event_receiver) = unbounded(); println!("starting up producer {}", producer_id); - let stat_collector = StatCollector::create( - config.batch_size.as_u64(), - config.num_records, - latency_sender.clone(), - stat_sender.clone(), - ); + let stat_collector = + StatCollector::create(end_sender.clone(), stat_sender.clone(), event_receiver); let (tx_control, rx_control) = unbounded(); - let worker = ProducerWorker::new(producer_id, config.clone(), stat_collector).await?; + //sleep(Duration::from_secs(1)).await; + let worker = + ProducerWorker::new(producer_id, config.clone(), stat_collector, event_sender) + .await?; let jh = spawn(timeout( config.worker_timeout, ProducerDriver::main_loop(rx_control, worker), @@ -76,35 +77,37 @@ impl ProducerBenchmark { loop { select! { - hist = latency_receiver.recv() => { - if let Ok(hist) = hist { + stat_rx = stat_receiver.recv() => { + if let Ok(stat) = stat_rx { + let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string(); + println!( + "{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency", + stat.record_send, stat.records_per_sec, human_readable_bytes, + utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max) + ); + } + } + end = end_receiver.recv() => { + if let Ok(end) = end { let mut latency_yaml = String::new(); latency_yaml.push_str(&format!("{:.2}ms avg latency, {:.2}ms max latency", - utils::nanos_to_ms_pritable(hist.mean() as u64), - utils::nanos_to_ms_pritable(hist.value_at_quantile(1.0)))); + utils::nanos_to_ms_pritable(end.histogram.mean() as u64), + utils::nanos_to_ms_pritable(end.histogram.value_at_quantile(1.0)))); for percentile in [0.5, 0.95, 0.99] { latency_yaml.push_str(&format!( ", {:.2}ms p{percentile:4.2}", - utils::nanos_to_ms_pritable(hist.value_at_quantile(percentile)), + utils::nanos_to_ms_pritable(end.histogram.value_at_quantile(percentile)), )); } println!("{}", latency_yaml); - } - break; - } - stat_rx = stat_receiver.recv() => { - if let Ok(stat) = stat_rx { - // lantecy_receiver is finishing the benchmark now - //if stat.end { - // break; - //} - let human_readable_bytes = ByteSize(stat.bytes_per_sec as u64).to_string(); + + let human_readable_bytes = ByteSize(end.bytes_per_sec).to_string(); println!( - "{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency", - stat.total_records_send, stat.records_per_sec, human_readable_bytes, - utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max) + "{} records sent, {} records/sec: ({}/sec) ", + end.total_records, end.records_per_sec, human_readable_bytes ); } + break; } } } diff --git a/crates/fluvio-benchmark/src/producer_worker.rs b/crates/fluvio-benchmark/src/producer_worker.rs index 0a0b1953d7..1bf33bb9a8 100644 --- a/crates/fluvio-benchmark/src/producer_worker.rs +++ b/crates/fluvio-benchmark/src/producer_worker.rs @@ -1,9 +1,14 @@ +use std::sync::Arc; + use anyhow::Result; +use async_channel::Sender; use fluvio::{ - dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, RecordKey, - TopicProducerConfigBuilder, TopicProducerPool, + dataplane::record::RecordData, DeliverySemantic, Fluvio, Isolation, ProduceCompletionEvent, + ProducerCallback, SharedProducerCallback, RecordKey, TopicProducerConfigBuilder, + TopicProducerPool, }; +use futures_util::future::BoxFuture; use crate::{ config::{ProducerConfig, RecordKeyAllocationStrategy}, @@ -13,16 +18,45 @@ use crate::{ const SHARED_KEY: &str = "shared_key"; +// Example implementation of the ProducerCallback trait +#[derive(Debug)] +struct BenchmarkProducerCallback { + event_sender: Sender, +} + +impl BenchmarkProducerCallback { + pub fn new(event_sender: Sender) -> Self { + Self { event_sender } + } +} + +impl ProducerCallback for BenchmarkProducerCallback { + fn finished(&self, event: ProduceCompletionEvent) -> BoxFuture<'_, anyhow::Result<()>> { + Box::pin(async { + self.event_sender.send(event).await?; + Ok(()) + }) + } +} + pub(crate) struct ProducerWorker { fluvio_producer: TopicProducerPool, records_to_send: Vec, stat: StatCollector, } impl ProducerWorker { - pub(crate) async fn new(id: u64, config: ProducerConfig, stat: StatCollector) -> Result { + pub(crate) async fn new( + id: u64, + config: ProducerConfig, + stat: StatCollector, + event_sender: Sender, + ) -> Result { let fluvio = Fluvio::connect().await?; + let callback: SharedProducerCallback = + Arc::new(BenchmarkProducerCallback::new(event_sender)); let fluvio_config = TopicProducerConfigBuilder::default() + .callback(callback) .batch_size(config.batch_size.as_u64() as usize) .batch_queue_size(config.queue_size as usize) .max_request_size(config.max_request_size.as_u64() as usize) @@ -39,16 +73,8 @@ impl ProducerWorker { let num_records = records_per_producer(id, config.num_producers, config.num_records); - println!("producer {} will send {} records", id, num_records); - let records_to_send = create_records(config.clone(), num_records, id); - println!( - "producer {} will send {} records", - id, - records_to_send.len() - ); - Ok(ProducerWorker { fluvio_producer, records_to_send, @@ -60,18 +86,15 @@ impl ProducerWorker { println!("producer is sending batch"); for record in self.records_to_send.into_iter() { - self.stat.start(); - let time = std::time::Instant::now(); - let send_out = self + self.stat.start().await; + let _ = self .fluvio_producer .send(record.key, record.data.clone()) .await?; - self.stat.send_out((send_out, time)); self.stat.add_record(record.data.len() as u64).await; } self.fluvio_producer.flush().await?; - self.stat.finish(); Ok(()) } diff --git a/crates/fluvio-benchmark/src/stats_collector.rs b/crates/fluvio-benchmark/src/stats_collector.rs index a4958a2d58..5ec65c0a49 100644 --- a/crates/fluvio-benchmark/src/stats_collector.rs +++ b/crates/fluvio-benchmark/src/stats_collector.rs @@ -1,162 +1,203 @@ -use std::{sync::Arc, time::Instant}; - -use async_channel::Sender; -use fluvio::ProduceOutput; -use fluvio_future::{sync::Mutex, task::spawn}; +use std::{ + sync::{atomic::AtomicU64, Arc}, + time::{Duration, Instant}, +}; + +use async_channel::{Receiver, Sender}; +use fluvio::ProduceCompletionEvent; +use fluvio_future::{sync::Mutex, task::spawn, timer::sleep}; use hdrhistogram::Histogram; -#[derive(Debug)] pub(crate) struct ProducerStat { - record_send: u64, - record_bytes: u64, - start_time: Instant, - output_tx: Sender<(ProduceOutput, Instant)>, - histogram: Arc>>, + stats: Arc, + start_time: Arc>>, +} + +pub struct AtomicStats { + record_send: AtomicU64, + record_bytes: AtomicU64, +} + +pub struct Stats { + pub record_send: u64, + pub record_bytes: u64, + pub records_per_sec: u64, + pub bytes_per_sec: u64, + pub latency_avg: u64, + pub latency_max: u64, +} + +pub struct EndProducerStat { + pub histogram: Histogram, + pub total_records: u64, + pub records_per_sec: u64, + pub bytes_per_sec: u64, } impl ProducerStat { - pub(crate) fn new(num_records: u64, latency_sender: Sender>) -> Self { - let (output_tx, rx) = async_channel::unbounded::<(ProduceOutput, Instant)>(); - let histogram = Arc::new(Mutex::new(hdrhistogram::Histogram::::new(2).unwrap())); + pub(crate) fn new( + end_sender: Sender, + stats_sender: Sender, + event_receiver: Receiver, + ) -> Self { + let start_time = Arc::new(Mutex::new(None)); + let stats = Arc::new(AtomicStats { + record_send: AtomicU64::new(0), + record_bytes: AtomicU64::new(0), + }); + + let histogram = Arc::new(Mutex::new(hdrhistogram::Histogram::::new(3).unwrap())); - ProducerStat::track_latency(num_records, latency_sender, rx, histogram.clone()); + ProducerStat::track_latency( + end_sender, + histogram.clone(), + event_receiver, + stats.clone(), + Arc::clone(&start_time), + ); + + ProducerStat::send_stats(histogram.clone(), stats_sender, stats.clone()) + .expect("send stats"); Self { - record_send: 0, - record_bytes: 0, - start_time: Instant::now(), - output_tx, - histogram, + stats: stats.clone(), + start_time, } } + fn send_stats( + histogram: Arc>>, + stats_sender: Sender, + stats: Arc, + ) -> Result<(), std::io::Error> { + spawn(async move { + loop { + let stats_sender = stats_sender.clone(); + let stats = Arc::clone(&stats); + let old_record_send = stats.record_send.load(std::sync::atomic::Ordering::Relaxed); + let old_record_bytes = stats + .record_bytes + .load(std::sync::atomic::Ordering::Relaxed); + sleep(Duration::from_secs(1)).await; + let new_record_send = stats.record_send.load(std::sync::atomic::Ordering::Relaxed); + let new_record_bytes = stats + .record_bytes + .load(std::sync::atomic::Ordering::Relaxed); + + if new_record_send == old_record_send { + continue; + } + + let records_delta = new_record_send - old_record_send; + let bytes_delta = new_record_bytes - old_record_bytes; + + let records_per_sec = records_delta; + let bytes_per_sec = bytes_delta / 1000; + + let hist = histogram.lock().await; + let latency_avg = hist.mean() as u64; + let latency_max = hist.value_at_quantile(1.0); + + stats_sender + .send(Stats { + record_send: records_delta, + record_bytes: bytes_delta, + records_per_sec, + bytes_per_sec, + latency_avg, + latency_max, + }) + .await + .expect("send stats"); + } + }); + Ok(()) + } + fn track_latency( - num_records: u64, - latency_sender: Sender>, - rx: async_channel::Receiver<(ProduceOutput, Instant)>, + end_sender: Sender, histogram: Arc>>, + event_receiver: Receiver, + stats: Arc, + start_time: Arc>>, ) { spawn(async move { - while let Ok((send_out, time)) = rx.recv().await { - let hist = histogram.clone(); - let latency_sender = latency_sender.clone(); - //spawn(async move { - match send_out.wait().await { - Ok(_) => { - let duration = time.elapsed(); + let hist = histogram.clone(); + while let Ok(event) = event_receiver.recv().await { + match event.metadata.base_offset().await { + Ok(_base_offset) => { + let elapsed = event.created_at.elapsed(); let mut hist = hist.lock().await; - hist.record(duration.as_nanos() as u64).expect("record"); - - if hist.len() >= num_records { - latency_sender.send(hist.clone()).await.expect("send"); - } + hist.record(elapsed.as_nanos() as u64).expect("record"); } Err(err) => { - println!("error sending record: {}", err); - return; + println!("received err: {:?}", err); } } - //}); } - }); - } - pub(crate) fn calcuate(&mut self) -> Stat { - let elapse = self.start_time.elapsed().as_millis(); - let records_per_sec = ((self.record_send as f64 / elapse as f64) * 1000.0).round(); - let bytes_per_sec = (self.record_bytes as f64 / elapse as f64) * 1000.0; - - let hist = self.histogram.lock_blocking(); - let latency_avg = hist.mean() as u64; - let latency_max = hist.value_at_quantile(1.0); - - Stat { - records_per_sec, - bytes_per_sec, - _total_bytes_send: self.record_bytes, - total_records_send: self.record_send, - latency_avg, - latency_max, - _end: false, - } + // send end + + let hist = hist.lock().await; + let start_time = start_time.lock().await.expect("start time"); + let elapsed = start_time.elapsed(); + + let records_per_sec = stats.record_send.load(std::sync::atomic::Ordering::Relaxed) + / (elapsed.as_millis() as u64 / 1000); + let bytes_per_sec = stats + .record_bytes + .load(std::sync::atomic::Ordering::Relaxed) + / (elapsed.as_millis() as u64 / 1000); + + let end = EndProducerStat { + histogram: hist.clone(), + total_records: stats.record_send.load(std::sync::atomic::Ordering::Relaxed), + records_per_sec, + bytes_per_sec, + }; + end_sender.send(end).await.expect("send end"); + }); } - pub(crate) fn set_current_time(&mut self) { - self.start_time = Instant::now(); + pub(crate) fn add_record(&mut self, bytes: u64) { + self.stats + .record_send + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.stats + .record_bytes + .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed); } - pub(crate) fn send_out(&mut self, out: (ProduceOutput, Instant)) { - self.output_tx.try_send(out).expect("send out"); + pub(crate) async fn set_current_time(&mut self) { + self.start_time.lock().await.replace(Instant::now()); } } -pub(crate) struct Stat { - pub records_per_sec: f64, - pub bytes_per_sec: f64, - pub _total_bytes_send: u64, - pub total_records_send: u64, - pub latency_avg: u64, - pub latency_max: u64, - pub _end: bool, -} - pub(crate) struct StatCollector { current: ProducerStat, - batch_size: u64, // number of records before we calculate stats - current_record: u64, // how many records we have sent in current cycle - sender: Sender, + current_record: u64, } impl StatCollector { pub(crate) fn create( - batch_size: u64, - num_records: u64, - latency_sender: Sender>, - sender: Sender, + end_sender: Sender, + stat_sender: Sender, + event_receiver: Receiver, ) -> Self { Self { - current: ProducerStat::new(num_records, latency_sender), - batch_size, + current: ProducerStat::new(end_sender, stat_sender, event_receiver), current_record: 0, - sender, } } - pub(crate) fn start(&mut self) { + pub(crate) async fn start(&mut self) { if self.current_record == 0 { - self.current.set_current_time(); + self.current.set_current_time().await; } } - pub(crate) fn send_out(&mut self, out: (ProduceOutput, Instant)) { - self.current.send_out(out); - } - pub(crate) async fn add_record(&mut self, bytes: u64) { - self.current.record_send += 1; - self.current.record_bytes += bytes; + self.current.add_record(bytes); self.current_record += 1; - - if self.current_record >= self.batch_size { - let stat = self.current.calcuate(); - self.current_record = 0; - self.current.record_bytes = 0; - self.current.record_send = 0; - self.sender.try_send(stat).expect("send stats"); - } - } - - pub(crate) fn finish(&mut self) { - let end_record = Stat { - records_per_sec: 0.0, - bytes_per_sec: 0.0, - _total_bytes_send: 0, - total_records_send: 0, - latency_avg: 0, - latency_max: 0, - _end: true, - }; - - self.sender.try_send(end_record).expect("send end stats"); } } diff --git a/crates/fluvio/src/lib.rs b/crates/fluvio/src/lib.rs index be77bb9d34..83cd54cd88 100644 --- a/crates/fluvio/src/lib.rs +++ b/crates/fluvio/src/lib.rs @@ -19,9 +19,10 @@ pub mod spu; pub use error::FluvioError; pub use config::FluvioConfig; pub use producer::{ - TopicProducerConfigBuilder, TopicProducerConfig, TopicProducer, TopicProducerPool, RecordKey, - ProduceOutput, FutureRecordMetadata, RecordMetadata, DeliverySemantic, RetryPolicy, - RetryStrategy, Partitioner, PartitionerConfig, ProducerError, + ProducerCallback, SharedProducerCallback, ProduceCompletionEvent, TopicProducerConfigBuilder, + TopicProducerConfig, TopicProducer, TopicProducerPool, RecordKey, ProduceOutput, + FutureRecordMetadata, RecordMetadata, DeliverySemantic, RetryPolicy, RetryStrategy, + Partitioner, PartitionerConfig, ProducerError, }; #[cfg(feature = "smartengine")] pub use producer::{SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData}; diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index 537910ad3a..46d1c42005 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -3,7 +3,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_channel::Sender; use async_lock::RwLock; @@ -218,6 +218,16 @@ impl RecordAccumulator { } } +pub struct ProduceCompletionEvent { + pub created_at: Instant, + pub metadata: Arc, +} + +pub type SharedProducerCallback = Arc + Send + Sync>; +pub trait ProducerCallback { + fn finished(&self, item: T) -> BoxFuture<'_, anyhow::Result<()>>; +} + pub(crate) struct PushRecord { pub(crate) future: FutureRecordMetadata, } @@ -276,6 +286,10 @@ impl ProducerBatch { pub(crate) fn batch(self) -> Batch { self.batch.into() } + + pub(crate) fn metadata(&self) -> Arc { + self.batch_metadata.clone() + } } pub(crate) struct BatchEvents { @@ -318,7 +332,8 @@ type ProduceResponseFuture = Shared>, } diff --git a/crates/fluvio/src/producer/config.rs b/crates/fluvio/src/producer/config.rs index 0fa1849d93..aae4390a3d 100644 --- a/crates/fluvio/src/producer/config.rs +++ b/crates/fluvio/src/producer/config.rs @@ -14,7 +14,9 @@ use serde::{Serialize, Deserialize}; use crate::producer::partitioning::{Partitioner, SiphashRoundRobinPartitioner}; +use super::accumulator::SharedProducerCallback; use super::partitioning::SpecificPartitioner; +use super::ProduceCompletionEvent; const DEFAULT_LINGER_MS: u64 = 100; const DEFAULT_TIMEOUT_MS: u64 = 1500; @@ -70,7 +72,7 @@ impl fmt::Debug for Box { /// Create this struct with [`TopicProducerConfigBuilder`]. /// /// Create a producer with a custom config with [`crate::Fluvio::topic_producer_with_config()`]. -#[derive(Debug, Builder)] +#[derive(Builder)] #[builder(pattern = "owned")] pub struct TopicProducerConfig { /// Maximum amount of bytes accumulated by the records before sending the batch. @@ -118,6 +120,9 @@ pub struct TopicProducerConfig { #[builder(default)] pub(crate) smartmodules: Vec, + + #[builder(setter(into, strip_option), default)] + pub(crate) callback: Option>, } impl TopicProducerConfigBuilder { @@ -177,6 +182,7 @@ impl Default for TopicProducerConfig { isolation: default_isolation(), delivery_semantic: default_delivery(), smartmodules: vec![], + callback: None, } } } diff --git a/crates/fluvio/src/producer/mod.rs b/crates/fluvio/src/producer/mod.rs index de597693f2..27f3b6ba32 100644 --- a/crates/fluvio/src/producer/mod.rs +++ b/crates/fluvio/src/producer/mod.rs @@ -45,6 +45,9 @@ pub use crate::producer::partitioning::{Partitioner, PartitionerConfig}; use self::accumulator::BatchEvents; use self::accumulator::BatchHandler; use self::accumulator::BatchesDeque; +pub use self::accumulator::SharedProducerCallback; +pub use self::accumulator::ProducerCallback; +pub use self::accumulator::ProduceCompletionEvent; pub use self::config::{ TopicProducerConfigBuilder, TopicProducerConfig, TopicProducerConfigBuilderError, DeliverySemantic, RetryPolicy, RetryStrategy, @@ -75,6 +78,7 @@ where batches_deque: Arc, batch_events: Arc, client_metric: Arc, + callback: Option>, } impl ProducerPool { @@ -84,6 +88,7 @@ impl ProducerPool { spu_pool: Arc, batches: Arc>, client_metric: Arc, + callback: Option>, ) -> Self where S: SpuPool + Send + Sync + 'static, @@ -103,6 +108,7 @@ impl ProducerPool { batches_deque: batch_list.clone(), batch_events: batch_events.clone(), client_metric: client_metric.clone(), + callback: callback.clone(), }; PartitionProducer::start( @@ -271,6 +277,7 @@ where batches_deque: BatchesDeque::shared(), batch_events: BatchEvents::shared(), client_metric: self.metrics.clone(), + callback: self.config.callback.clone(), }; let _ = producer_pool @@ -433,6 +440,7 @@ where spu_pool.clone(), Arc::new(record_accumulator.batches().await), metrics.clone(), + config.callback.clone(), ); Ok(Self { diff --git a/crates/fluvio/src/producer/partition_producer.rs b/crates/fluvio/src/producer/partition_producer.rs index fab984842a..a55bdc7ef2 100644 --- a/crates/fluvio/src/producer/partition_producer.rs +++ b/crates/fluvio/src/producer/partition_producer.rs @@ -18,7 +18,7 @@ use fluvio_socket::VersionedSerialSocket; use crate::spu::SpuPool; use crate::TopicProducerConfig; -use super::{PartitionProducerParams, ProducerError}; +use super::{PartitionProducerParams, ProduceCompletionEvent, SharedProducerCallback, ProducerError}; use super::accumulator::{BatchEvents, BatchesDeque}; use super::event::EventHandler; @@ -34,6 +34,7 @@ where batch_events: Arc, last_error: Arc>>, metrics: Arc, + callback: Option>, } impl PartitionProducer @@ -53,6 +54,7 @@ where batch_events: params.batch_events, last_error, metrics: params.client_metric, + callback: params.callback, } } @@ -189,7 +191,7 @@ where ..Default::default() }; - let mut batch_notifiers = vec![]; + let mut batches_notify_and_metadata = vec![]; for p_batch in batches_ready { let mut partition_request = DefaultPartitionRequest { @@ -197,6 +199,7 @@ where ..Default::default() }; let notify = p_batch.notify.clone(); + let metadata = p_batch.metadata().clone(); let batch = p_batch.batch(); let raw_batch: Batch = batch.try_into()?; @@ -206,7 +209,7 @@ where producer_metrics.add_bytes(raw_batch.batch_len() as u64); partition_request.records.batches.push(raw_batch); - batch_notifiers.push(notify); + batches_notify_and_metadata.push((notify, metadata)); topic_request.partitions.push(partition_request); } @@ -217,12 +220,21 @@ where let (response, _) = self.send_to_socket(spu_socket, request).await?; - for (batch_notifier, partition_response_fut) in - batch_notifiers.into_iter().zip(response.into_iter()) + for ((notify, batch_metadata), partition_response_fut) in batches_notify_and_metadata + .into_iter() + .zip(response.into_iter()) { - if let Err(_e) = batch_notifier.send(partition_response_fut).await { + if let Err(_e) = notify.send(partition_response_fut.clone()).await { trace!("Failed to notify produce result because receiver was dropped"); } + + if let Some(callback) = self.callback.clone() { + let event = ProduceCompletionEvent { + created_at: batch_metadata.created_at, + metadata: batch_metadata.clone(), + }; + callback.finished(event).await.unwrap(); + } } Ok(()) diff --git a/crates/fluvio/src/producer/record.rs b/crates/fluvio/src/producer/record.rs index 9c262cf212..e194c7a6d5 100644 --- a/crates/fluvio/src/producer/record.rs +++ b/crates/fluvio/src/producer/record.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Instant; use async_channel::Receiver; use async_lock::RwLock; @@ -34,7 +35,7 @@ impl RecordMetadata { } /// Possible states of a batch in the accumulator -pub(crate) enum BatchMetadataState { +pub enum BatchMetadataState { /// The batch is buffered and ready to be sent to the SPU Buffered(Receiver), /// The batch was sent to the SPU. Base offset is known @@ -43,20 +44,22 @@ pub(crate) enum BatchMetadataState { Failed(ProducerError), } -pub(crate) struct BatchMetadata { - state: RwLock, +pub struct BatchMetadata { + pub state: RwLock, + pub created_at: Instant, } impl BatchMetadata { pub(crate) fn new(receiver: Receiver) -> Self { Self { state: RwLock::new(BatchMetadataState::Buffered(receiver)), + created_at: Instant::now(), } } /// Wait for the base offset of the batch. This is the offset of the first /// record in the batch and it is known once the batch is sent to the server. - pub(crate) async fn base_offset(&self) -> Result { + pub async fn base_offset(&self) -> Result { let mut state = self.state.write().await; match &*state { BatchMetadataState::Buffered(receiver) => {