Skip to content

Commit

Permalink
refactor: lazily compute the push body
Browse files Browse the repository at this point in the history
It allows to optimize the local workflow, as it remove all the
memmove caused by the body computation if there is no remote route.
  • Loading branch information
wyfo committed Jan 14, 2025
1 parent 2c889ba commit 46488ea
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 168 deletions.
62 changes: 28 additions & 34 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2151,40 +2151,34 @@ impl SessionInner {
let timestamp = timestamp.or_else(|| self.runtime.new_timestamp());
let wire_expr = key_expr.to_wire(self);
if destination != Locality::SessionLocal {
primitives.send_push(
Push {
wire_expr: wire_expr.to_owned(),
ext_qos: push::ext::QoSType::new(
priority.into(),
congestion_control,
is_express,
),
ext_tstamp: None,
ext_nodeid: push::ext::NodeIdType::DEFAULT,
payload: match kind {
SampleKind::Put => PushBody::Put(Put {
timestamp,
encoding: encoding.clone().into(),
#[cfg(feature = "unstable")]
ext_sinfo: source_info.clone().into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
payload: payload.clone().into(),
}),
SampleKind::Delete => PushBody::Del(Del {
timestamp,
#[cfg(feature = "unstable")]
ext_sinfo: source_info.clone().into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
}),
},
primitives.send_push_lazy(
wire_expr.to_owned(),
push::ext::QoSType::new(priority.into(), congestion_control, is_express),
None,
push::ext::NodeIdType::DEFAULT,
|| match kind {
SampleKind::Put => PushBody::Put(Put {
timestamp,
encoding: encoding.clone().into(),
#[cfg(feature = "unstable")]
ext_sinfo: source_info.clone().into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
payload: payload.clone().into(),
}),
SampleKind::Delete => PushBody::Del(Del {
timestamp,
#[cfg(feature = "unstable")]
ext_sinfo: source_info.clone().into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
}),
},
#[cfg(feature = "unstable")]
reliability,
Expand Down
38 changes: 34 additions & 4 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use std::{

use tokio_util::sync::CancellationToken;
use zenoh_protocol::{
core::{ExprId, Reliability, WhatAmI, ZenohIdProto},
core::{ExprId, Reliability, WhatAmI, WireExpr, ZenohIdProto},
network::{
interest::{InterestId, InterestMode, InterestOptions},
Mapping, Push, Request, RequestId, Response, ResponseFinal,
push, Mapping, Push, Request, RequestId, Response, ResponseFinal,
},
zenoh::RequestBody,
zenoh::{PushBody, RequestBody},
};
use zenoh_sync::get_mut_unchecked;
use zenoh_task::TaskController;
Expand Down Expand Up @@ -207,6 +207,27 @@ pub struct Face {
}

impl Face {
pub(crate) fn send_push_lazy(
&self,
wire_expr: WireExpr,
qos: push::ext::QoSType,
ext_tstamp: Option<push::ext::TimestampType>,
ext_nodeid: push::ext::NodeIdType,
body: impl FnOnce() -> PushBody,
reliability: Reliability,
) {
route_data(
&self.tables,
&self.state,
wire_expr,
qos,
ext_tstamp,
ext_nodeid,
body,
reliability,
);
}

pub fn downgrade(&self) -> WeakFace {
WeakFace {
tables: Arc::downgrade(&self.tables),
Expand Down Expand Up @@ -388,7 +409,16 @@ impl Primitives for Face {

#[inline]
fn send_push(&self, msg: Push, reliability: Reliability) {
route_data(&self.tables, &self.state, msg, reliability);
route_data(
&self.tables,
&self.state,
msg.wire_expr,
msg.ext_qos,
msg.ext_tstamp,
msg.ext_nodeid,
move || msg.payload,
reliability,
);
}

fn send_request(&self, msg: Request) {
Expand Down
51 changes: 29 additions & 22 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ use std::{collections::HashMap, sync::Arc};
use zenoh_core::zread;
use zenoh_protocol::{
core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr},
network::{
declare::{ext, SubscriberId},
Push,
},
network::{declare::SubscriberId, push::ext, Push},
zenoh::PushBody,
};
use zenoh_sync::get_mut_unchecked;
Expand Down Expand Up @@ -391,39 +388,47 @@ macro_rules! inc_stats {
pub fn route_data(
tables_ref: &Arc<TablesLock>,
face: &FaceState,
mut msg: Push,
wire_expr: WireExpr,
ext_qos: ext::QoSType,
ext_tstamp: Option<ext::TimestampType>,
ext_nodeid: ext::NodeIdType,
payload: impl FnOnce() -> PushBody,
reliability: Reliability,
) {
let tables = zread!(tables_ref.tables);
match tables
.get_mapping(face, &msg.wire_expr.scope, msg.wire_expr.mapping)
.get_mapping(face, &wire_expr.scope, wire_expr.mapping)
.cloned()
{
Some(prefix) => {
tracing::trace!(
"{} Route data for res {}{}",
face,
prefix.expr(),
msg.wire_expr.suffix.as_ref()
wire_expr.suffix.as_ref()
);
let mut expr = RoutingExpr::new(&prefix, msg.wire_expr.suffix.as_ref());
let mut expr = RoutingExpr::new(&prefix, wire_expr.suffix.as_ref());

#[cfg(feature = "stats")]
let admin = expr.full_expr().starts_with("@/");
#[cfg(feature = "stats")]
let mut payload = payload();
#[cfg(feature = "stats")]
if !admin {
inc_stats!(face, rx, user, msg.payload)
inc_stats!(face, rx, user, payload);
} else {
inc_stats!(face, rx, admin, msg.payload)
inc_stats!(face, rx, admin, payload);
}

if tables.hat_code.ingress_filter(&tables, face, &mut expr) {
let res = Resource::get_resource(&prefix, expr.suffix);

let route = get_data_route(&tables, face, &res, &mut expr, msg.ext_nodeid.node_id);
let route = get_data_route(&tables, face, &res, &mut expr, ext_nodeid.node_id);

if !route.is_empty() {
treat_timestamp!(&tables.hlc, msg.payload, tables.drop_future_timestamp);
#[cfg(not(feature = "stats"))]
let mut payload = payload();
treat_timestamp!(&tables.hlc, payload, tables.drop_future_timestamp);

if route.len() == 1 {
let (outface, key_expr, context) = route.values().next().unwrap();
Expand All @@ -434,18 +439,20 @@ pub fn route_data(
drop(tables);
#[cfg(feature = "stats")]
if !admin {
inc_stats!(outface, tx, user, msg.payload)
inc_stats!(face, rx, user, payload);
inc_stats!(outface, tx, user, payload);
} else {
inc_stats!(outface, tx, admin, msg.payload)
inc_stats!(face, rx, admin, payload);
inc_stats!(outface, tx, admin, payload);
}

outface.primitives.send_push(
Push {
wire_expr: key_expr.into(),
ext_qos: msg.ext_qos,
ext_tstamp: msg.ext_tstamp,
ext_qos,
ext_tstamp,
ext_nodeid: ext::NodeIdType { node_id: *context },
payload: msg.payload,
payload,
},
reliability,
)
Expand All @@ -465,18 +472,18 @@ pub fn route_data(
for (outface, key_expr, context) in route {
#[cfg(feature = "stats")]
if !admin {
inc_stats!(outface, tx, user, msg.payload)
inc_stats!(outface, tx, user, payload)
} else {
inc_stats!(outface, tx, admin, msg.payload)
inc_stats!(outface, tx, admin, payload)
}

outface.primitives.send_push(
Push {
wire_expr: key_expr,
ext_qos: msg.ext_qos,
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: context },
payload: msg.payload.clone(),
payload: payload.clone(),
},
reliability,
)
Expand All @@ -489,7 +496,7 @@ pub fn route_data(
tracing::error!(
"{} Route data with unknown scope {}!",
face,
msg.wire_expr.scope
wire_expr.scope
);
}
}
Expand Down
Loading

0 comments on commit 46488ea

Please sign in to comment.