From 8272a8886022c0ecc676b0d5238e8ace6bbdec63 Mon Sep 17 00:00:00 2001 From: Jack Morrison Date: Tue, 26 Mar 2024 19:49:08 -0600 Subject: [PATCH 1/2] Queryable: Incrementally reply with alignments. --- .../src/replica/align_queryable.rs | 88 +++++++------------ 1 file changed, 30 insertions(+), 58 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs index 7295367a06..71262ce625 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs @@ -19,6 +19,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::str; use std::str::FromStr; use zenoh::prelude::r#async::*; +use zenoh::queryable::Query; use zenoh::time::Timestamp; use zenoh::Session; @@ -89,87 +90,58 @@ impl AlignQueryable { diff_required ); if diff_required.is_some() { - let values = self.get_value(diff_required.unwrap()).await; - log::trace!("[ALIGN QUERYABLE] value for the query is {:?}", values); - for value in values { - match value { - AlignData::Interval(i, c) => { - let sample = Sample::new( - query.key_expr().clone(), - serde_json::to_string(&(i, c)).unwrap(), - ); - query.reply(Ok(sample)).res().await.unwrap(); - } - AlignData::Subinterval(i, c) => { - let sample = Sample::new( - query.key_expr().clone(), - serde_json::to_string(&(i, c)).unwrap(), - ); - query.reply(Ok(sample)).res().await.unwrap(); - } - AlignData::Content(i, c) => { - let sample = Sample::new( - query.key_expr().clone(), - serde_json::to_string(&(i, c)).unwrap(), - ); - query.reply(Ok(sample)).res().await.unwrap(); - } - AlignData::Data(k, (v, ts)) => { - let sample = Sample::new(k, v).with_timestamp(ts); - query.reply(Ok(sample)).res().await.unwrap(); - } - } - } + self.reply_diff(diff_required.unwrap(), query).await; } } } - async fn get_value(&self, diff_required: AlignComponent) -> Vec { + async fn reply_diff(&self, diff_required: AlignComponent, query: Query) { + let reply = |value| async { + let ke = query.key_expr().clone(); + let sample = match value { + AlignData::Content(i, c) => { + Sample::new(ke, serde_json::to_string(&(i, c)).unwrap()) + } + + AlignData::Subinterval(i, c) | AlignData::Interval(i, c) => { + Sample::new(ke, serde_json::to_string(&(i, c)).unwrap()) + } + AlignData::Data(k, (v, ts)) => Sample::new(k, v).with_timestamp(ts), + }; + query.reply(Ok(sample)).res().await.unwrap(); + }; + // TODO: Discuss if having timestamp is useful match diff_required { AlignComponent::Era(era) => { - let intervals = self.get_intervals(&era).await; - let mut result = Vec::new(); - for (i, c) in intervals { - result.push(AlignData::Interval(i, c)); + for (i, c) in self.get_intervals(&era).await { + reply(AlignData::Interval(i, c)).await; } - result } AlignComponent::Intervals(intervals) => { - let mut subintervals = HashMap::new(); for each in intervals { - subintervals.extend(self.get_subintervals(each).await); - } - let mut result = Vec::new(); - for (i, c) in subintervals { - result.push(AlignData::Subinterval(i, c)); + for (i, c) in self.get_subintervals(each).await { + reply(AlignData::Subinterval(i, c)).await; + } } - result } AlignComponent::Subintervals(subintervals) => { - let mut content = HashMap::new(); for each in subintervals { - content.extend(self.get_content(each).await); - } - let mut result = Vec::new(); - for (i, c) in content { - result.push(AlignData::Content(i, c)); + for (i, c) in self.get_content(each).await { + reply(AlignData::Content(i, c)).await; + } } - result } AlignComponent::Contents(contents) => { - let mut result = Vec::new(); for each in contents { - let entry = self.get_entry(&each).await; - if entry.is_some() { - let entry = entry.unwrap(); - result.push(AlignData::Data( + if let Some(entry) = self.get_entry(&each).await { + reply(AlignData::Data( OwnedKeyExpr::from(entry.key_expr), (entry.value, each.timestamp), - )); + )) + .await; } } - result } } } From cbd27176a8827ef5c8721c00da9e1782b2598df6 Mon Sep 17 00:00:00 2001 From: Jack Morrison Date: Sun, 24 Mar 2024 21:51:16 -0600 Subject: [PATCH 2/2] Zenoh Storage: Prevent unbounded growth of replica queue. --- plugins/zenoh-plugin-storage-manager/src/replica/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs index b743a70451..7985392b9f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs @@ -119,7 +119,7 @@ impl Replica { // Create channels for communication between components // channel to queue digests to be aligned - let (tx_digest, rx_digest) = flume::unbounded(); + let (tx_digest, rx_digest) = flume::bounded(10); // channel for aligner to send missing samples to storage let (tx_sample, rx_sample) = flume::unbounded(); // channel for storage to send logging information back @@ -247,9 +247,12 @@ impl Replica { .await; if to_be_processed { log::trace!("[DIGEST_SUB] sending {} to aligner", digest.checksum); - match tx.send_async((from.to_string(), digest)).await { + match tx.try_send((from.to_string(), digest)) { Ok(()) => {} - Err(e) => log::error!("[DIGEST_SUB] Error sending digest to aligner: {}", e), + Err(e) => { + // Trace because this can happen _a lot_ on busy channels. + log::trace!("[DIGEST_SUB] Error sending digest to aligner: {}", e) + } } }; received.insert(from.to_string(), ts);