diff --git a/pubsub/src/subscription.rs b/pubsub/src/subscription.rs index ea138c5c..f276c4e3 100644 --- a/pubsub/src/subscription.rs +++ b/pubsub/src/subscription.rs @@ -79,6 +79,23 @@ pub struct SubscriptionConfigToUpdate { pub retry_policy: Option, } +#[derive(Debug, Clone, Default)] +pub struct SubscribeConfig { + enable_multiple_subscriber: bool, + subscriber_config: SubscriberConfig, +} + +impl SubscribeConfig { + pub fn with_enable_multiple_subscriber(mut self, v: bool) -> Self { + self.enable_multiple_subscriber = v; + self + } + pub fn with_subscriber_config(mut self, v: SubscriberConfig) -> Self { + self.subscriber_config = v; + self + } +} + #[derive(Debug, Clone)] pub struct ReceiveConfig { pub worker_count: usize, @@ -353,13 +370,25 @@ impl Subscription { /// Ok(()) /// } /// ``` - pub async fn subscribe(&self, opt: Option) -> Result { + pub async fn subscribe(&self, opt: Option) -> Result { let (tx, rx) = async_channel::unbounded::(); let cancel = CancellationToken::new(); // spawn a separate subscriber task for each connection in the pool - for _ in 0..self.pool_size() { - Subscriber::start(cancel.clone(), self.fqsn.clone(), self.subc.clone(), tx.clone(), opt.clone()); + let opt = opt.unwrap_or_default(); + let subscribers = if opt.enable_multiple_subscriber { + self.pool_size() + } else { + 1 + }; + for _ in 0..subscribers { + Subscriber::start( + cancel.clone(), + self.fqsn.clone(), + self.subc.clone(), + tx.clone(), + Some(opt.subscriber_config.clone()), + ); } Ok(MessageStream { queue: rx, cancel }) @@ -584,7 +613,7 @@ mod tests { use crate::apiv1::publisher_client::PublisherClient; use crate::apiv1::subscriber_client::SubscriberClient; use crate::subscriber::ReceivedMessage; - use crate::subscription::{SeekTo, Subscription, SubscriptionConfig, SubscriptionConfigToUpdate}; + use crate::subscription::{SeekTo, SubscribeConfig, Subscription, SubscriptionConfig, SubscriptionConfigToUpdate}; const PROJECT_NAME: &str = "local-project"; const EMULATOR: &str = "localhost:8681"; @@ -966,12 +995,22 @@ mod tests { #[tokio::test] #[serial] - async fn test_subscribe() { + async fn test_subscribe_single_subscriber() { + test_subscribe(None).await + } + + #[tokio::test] + #[serial] + async fn test_subscribe_multiple_subscriber() { + test_subscribe(Some(SubscribeConfig::default().with_enable_multiple_subscriber(true))).await + } + + async fn test_subscribe(opt: Option) { let subscription = create_subscription(false).await; let received = Arc::new(Mutex::new(false)); let checking = received.clone(); let _handler = tokio::spawn(async move { - let mut iter = subscription.subscribe(None).await.unwrap(); + let mut iter = subscription.subscribe(opt).await.unwrap(); while let Some(message) = iter.next().await { *received.lock().unwrap() = true; let _ = message.ack().await;