Skip to content

Commit

Permalink
chore: use box future (#4307)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev authored Jan 2, 2025
1 parent 0680e60 commit e8a0411
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions crates/fluvio/src/consumer/stream.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use async_channel::Sender;
use fluvio_protocol::{link::ErrorCode, record::ConsumerRecord as Record};
use futures_util::future::BoxFuture;
use futures_util::stream::select_all;
use futures_util::{future::try_join_all, ready, Future, FutureExt};
use futures_util::{future::try_join_all, ready, FutureExt};
use futures_util::Stream;
use tracing::{info, warn};

use super::config::OffsetManagementStrategy;
use super::{offset::OffsetLocalStore, StreamToServer};

type OffsetFlushFuture<'a> = Pin<Box<dyn Future<Output = Result<(), ErrorCode>> + Send + 'a>>;

/// Extension of [`Stream`] trait with offset management capabilities.
pub trait ConsumerStream: Stream<Item = Result<Record, ErrorCode>> + Unpin {
/// Mark the offset of the last yelded record as committed. Depending on [`OffsetManagementStrategy`]
/// it may require a subsequent `offset_flush()` call to take any effect.
fn offset_commit(&mut self) -> Result<(), ErrorCode>;

/// Send the committed offset to the server. The method waits for the server's acknowledgment before it finishes.
fn offset_flush(&mut self) -> OffsetFlushFuture<'_>;
fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>>;
}

pub struct MultiplePartitionConsumerStream<T> {
Expand Down Expand Up @@ -120,7 +118,7 @@ where
self.get_mut().offset_commit()
}

fn offset_flush(&mut self) -> OffsetFlushFuture<'_> {
fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> {
self.get_mut().offset_flush()
}
}
Expand All @@ -132,7 +130,7 @@ impl<T: Stream<Item = Result<Record, ErrorCode>> + Unpin> ConsumerStream
self.offset_mngt.commit()
}

fn offset_flush(&mut self) -> OffsetFlushFuture<'_> {
fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> {
Box::pin(self.offset_mngt.flush())
}
}
Expand All @@ -147,7 +145,7 @@ impl<T: Stream<Item = Result<Record, ErrorCode>> + Unpin> ConsumerStream
Ok(())
}

fn offset_flush(&mut self) -> OffsetFlushFuture<'_> {
fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> {
let futures: Vec<_> = self.offset_mgnts.iter().map(|p| p.flush()).collect();
Box::pin(try_join_all(futures).map(|r| r.map(|_| ())))
}
Expand Down

0 comments on commit e8a0411

Please sign in to comment.