diff --git a/Cargo.lock b/Cargo.lock index 557f616cc1..97ab552601 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2373,7 +2373,7 @@ dependencies = [ [[package]] name = "fluvio" -version = "0.24.1" +version = "0.24.2" dependencies = [ "anyhow", "async-channel 1.9.0", diff --git a/crates/fluvio/src/consumer/stream.rs b/crates/fluvio/src/consumer/stream.rs index 623e46c57c..fbd9005ceb 100644 --- a/crates/fluvio/src/consumer/stream.rs +++ b/crates/fluvio/src/consumer/stream.rs @@ -1,3 +1,4 @@ +use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -12,6 +13,8 @@ use tracing::{info, warn}; use super::config::OffsetManagementStrategy; use super::{offset::OffsetLocalStore, StreamToServer}; +type OffsetFlushFuture<'a> = Pin> + Send + 'a>>; + /// Extension of [`Stream`] trait with offset management capabilities. pub trait ConsumerStream: Stream> + Unpin { /// Mark the offset of the last yelded record as committed. Depending on [`OffsetManagementStrategy`] @@ -19,7 +22,7 @@ pub trait ConsumerStream: Stream> + Unpin { fn offset_commit(&mut self) -> Result<(), ErrorCode>; /// Send the committed offset to the server. The method waits for the server's acknowledgment before it finishes. - fn offset_flush(&mut self) -> impl Future> + Send; + fn offset_flush(&mut self) -> OffsetFlushFuture<'_>; } pub struct MultiplePartitionConsumerStream { @@ -117,7 +120,7 @@ where self.get_mut().offset_commit() } - fn offset_flush(&mut self) -> impl Future> + Send { + fn offset_flush(&mut self) -> OffsetFlushFuture<'_> { self.get_mut().offset_flush() } } @@ -129,8 +132,8 @@ impl> + Unpin> ConsumerStream self.offset_mngt.commit() } - fn offset_flush(&mut self) -> impl Future> + Send { - self.offset_mngt.flush() + fn offset_flush(&mut self) -> OffsetFlushFuture<'_> { + Box::pin(self.offset_mngt.flush()) } } @@ -144,9 +147,9 @@ impl> + Unpin> ConsumerStream Ok(()) } - fn offset_flush(&mut self) -> impl Future> + Send { + fn offset_flush(&mut self) -> OffsetFlushFuture<'_> { let futures: Vec<_> = self.offset_mgnts.iter().map(|p| p.flush()).collect(); - try_join_all(futures).map(|r| r.map(|_| ())) + Box::pin(try_join_all(futures).map(|r| r.map(|_| ()))) } }