diff --git a/commons/zenoh-codec/src/network/declare.rs b/commons/zenoh-codec/src/network/declare.rs index 6e9dad12ce..ed3d019950 100644 --- a/commons/zenoh-codec/src/network/declare.rs +++ b/commons/zenoh-codec/src/network/declare.rs @@ -19,17 +19,10 @@ use zenoh_buffers::{ ZBuf, }; use zenoh_protocol::{ - common::{ - iext, - imsg::{self, HEADER_BITS}, - ZExtZ64, - }, + common::{iext, imsg, ZExtZ64}, core::{ExprId, ExprLen, WireExpr}, network::{ - declare::{ - self, common, interest, keyexpr, queryable, subscriber, token, Declare, DeclareBody, - DeclareMode, Interest, - }, + declare::{self, common, keyexpr, queryable, subscriber, token, Declare, DeclareBody}, id, Mapping, }, }; @@ -51,7 +44,6 @@ where DeclareBody::UndeclareQueryable(r) => self.write(&mut *writer, r)?, DeclareBody::DeclareToken(r) => self.write(&mut *writer, r)?, DeclareBody::UndeclareToken(r) => self.write(&mut *writer, r)?, - DeclareBody::DeclareInterest(r) => self.write(&mut *writer, r)?, DeclareBody::DeclareFinal(r) => self.write(&mut *writer, r)?, } @@ -79,7 +71,6 @@ where U_QUERYABLE => DeclareBody::UndeclareQueryable(codec.read(&mut *reader)?), D_TOKEN => DeclareBody::DeclareToken(codec.read(&mut *reader)?), U_TOKEN => DeclareBody::UndeclareToken(codec.read(&mut *reader)?), - D_INTEREST => DeclareBody::DeclareInterest(codec.read(&mut *reader)?), D_FINAL => DeclareBody::DeclareFinal(codec.read(&mut *reader)?), _ => return Err(DidntRead), }; @@ -97,7 +88,7 @@ where fn write(self, writer: &mut W, x: &Declare) -> Self::Output { let Declare { - mode, + interest_id, ext_qos, ext_tstamp, ext_nodeid, @@ -106,13 +97,9 @@ where // Header let mut header = id::DECLARE; - header |= match mode { - DeclareMode::Push => 0b00, - DeclareMode::Response(_) => 0b01, - DeclareMode::Request(_) => 0b10, - DeclareMode::RequestContinuous(_) => 0b11, - } << HEADER_BITS; - + if x.interest_id.is_some() { + header |= declare::flag::I; + } let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8) + ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8); @@ -121,12 +108,8 @@ where } self.write(&mut *writer, header)?; - // Body - if let DeclareMode::Request(rid) - | DeclareMode::RequestContinuous(rid) - | DeclareMode::Response(rid) = mode - { - self.write(&mut *writer, rid)?; + if let Some(interest_id) = interest_id { + self.write(&mut *writer, interest_id)?; } // Extensions @@ -175,14 +158,10 @@ where return Err(DidntRead); } - // Body - let mode = match (self.header >> HEADER_BITS) & 0b11 { - 0b00 => DeclareMode::Push, - 0b01 => DeclareMode::Response(self.codec.read(&mut *reader)?), - 0b10 => DeclareMode::Request(self.codec.read(&mut *reader)?), - 0b11 => DeclareMode::RequestContinuous(self.codec.read(&mut *reader)?), - _ => return Err(DidntRead), - }; + let mut interest_id = None; + if imsg::has_flag(self.header, declare::flag::I) { + interest_id = Some(self.codec.read(&mut *reader)?); + } // Extensions let mut ext_qos = declare::ext::QoSType::DEFAULT; @@ -219,7 +198,7 @@ where let body: DeclareBody = self.codec.read(&mut *reader)?; Ok(Declare { - mode, + interest_id, ext_qos, ext_tstamp, ext_nodeid, @@ -938,7 +917,7 @@ where // Extensions let mut ext_wire_expr = common::ext::WireExprType::null(); - let mut has_ext = imsg::has_flag(self.header, interest::flag::Z); + let mut has_ext = imsg::has_flag(self.header, token::flag::Z); while has_ext { let ext: u8 = self.codec.read(&mut *reader)?; let eodec = Zenoh080Header::new(ext); @@ -958,86 +937,6 @@ where } } -// DeclareInterest -impl WCodec<&interest::DeclareInterest, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &interest::DeclareInterest) -> Self::Output { - let interest::DeclareInterest { - interest: _, - wire_expr, - } = x; - - // Header - let header = declare::id::D_INTEREST; - self.write(&mut *writer, header)?; - - // Body - self.write(&mut *writer, x.options())?; - if let Some(we) = wire_expr.as_ref() { - self.write(&mut *writer, we)?; - } - - Ok(()) - } -} - -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - let header: u8 = self.read(&mut *reader)?; - let codec = Zenoh080Header::new(header); - codec.read(reader) - } -} - -impl RCodec for Zenoh080Header -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - if imsg::mid(self.header) != declare::id::D_INTEREST { - return Err(DidntRead); - } - - // Body - let options: u8 = self.codec.read(&mut *reader)?; - let interest = Interest::from(options); - - let mut wire_expr = None; - if interest.restricted() { - let ccond = Zenoh080Condition::new(interest.named()); - let mut we: WireExpr<'static> = ccond.read(&mut *reader)?; - we.mapping = if interest.mapping() { - Mapping::Sender - } else { - Mapping::Receiver - }; - wire_expr = Some(we); - } - - // Extensions - let has_ext = imsg::has_flag(self.header, token::flag::Z); - if has_ext { - extension::skip_all(reader, "DeclareInterest")?; - } - - Ok(interest::DeclareInterest { - interest, - wire_expr, - }) - } -} - // WARNING: this is a temporary extension used for undeclarations impl WCodec<(&common::ext::WireExprType, bool), &mut W> for Zenoh080 where diff --git a/commons/zenoh-codec/src/network/interest.rs b/commons/zenoh-codec/src/network/interest.rs new file mode 100644 index 0000000000..9d1e64de76 --- /dev/null +++ b/commons/zenoh-codec/src/network/interest.rs @@ -0,0 +1,186 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Condition, Zenoh080Header}; +use zenoh_buffers::{ + reader::{DidntRead, Reader}, + writer::{DidntWrite, Writer}, +}; +use zenoh_protocol::{ + common::{ + iext, + imsg::{self, HEADER_BITS}, + }, + core::WireExpr, + network::{ + declare, id, + interest::{self, Interest, InterestMode, InterestOptions}, + Mapping, + }, +}; + +// Interest +impl WCodec<&Interest, &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: &Interest) -> Self::Output { + let Interest { + id, + mode, + options: _, // Compute the options on-the-fly according to Interest fields + wire_expr, + ext_qos, + ext_tstamp, + ext_nodeid, + } = x; + + // Header + let mut header = id::INTEREST; + header |= match mode { + InterestMode::Final => 0b00, + InterestMode::Current => 0b01, + InterestMode::Future => 0b10, + InterestMode::CurrentFuture => 0b11, + } << HEADER_BITS; + let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8) + + (ext_tstamp.is_some() as u8) + + ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8); + if n_exts != 0 { + header |= declare::flag::Z; + } + self.write(&mut *writer, header)?; + + self.write(&mut *writer, id)?; + + if *mode != InterestMode::Final { + self.write(&mut *writer, x.options())?; + if let Some(we) = wire_expr.as_ref() { + self.write(&mut *writer, we)?; + } + } + + // Extensions + if ext_qos != &declare::ext::QoSType::DEFAULT { + n_exts -= 1; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; + } + if let Some(ts) = ext_tstamp.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (ts, n_exts != 0))?; + } + if ext_nodeid != &declare::ext::NodeIdType::DEFAULT { + n_exts -= 1; + self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?; + } + + Ok(()) + } +} + +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + let header: u8 = self.read(&mut *reader)?; + let codec = Zenoh080Header::new(header); + + codec.read(reader) + } +} + +impl RCodec for Zenoh080Header +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + if imsg::mid(self.header) != id::INTEREST { + return Err(DidntRead); + } + + let id = self.codec.read(&mut *reader)?; + let mode = match (self.header >> HEADER_BITS) & 0b11 { + 0b00 => InterestMode::Final, + 0b01 => InterestMode::Current, + 0b10 => InterestMode::Future, + 0b11 => InterestMode::CurrentFuture, + _ => return Err(DidntRead), + }; + + let mut options = InterestOptions::empty(); + let mut wire_expr = None; + if mode != InterestMode::Final { + let options_byte: u8 = self.codec.read(&mut *reader)?; + options = InterestOptions::from(options_byte); + if options.restricted() { + let ccond = Zenoh080Condition::new(options.named()); + let mut we: WireExpr<'static> = ccond.read(&mut *reader)?; + we.mapping = if options.mapping() { + Mapping::Sender + } else { + Mapping::Receiver + }; + wire_expr = Some(we); + } + } + + // Extensions + let mut ext_qos = declare::ext::QoSType::DEFAULT; + let mut ext_tstamp = None; + let mut ext_nodeid = declare::ext::NodeIdType::DEFAULT; + + let mut has_ext = imsg::has_flag(self.header, declare::flag::Z); + while has_ext { + let ext: u8 = self.codec.read(&mut *reader)?; + let eodec = Zenoh080Header::new(ext); + match iext::eid(ext) { + declare::ext::QoS::ID => { + let (q, ext): (interest::ext::QoSType, bool) = eodec.read(&mut *reader)?; + ext_qos = q; + has_ext = ext; + } + declare::ext::Timestamp::ID => { + let (t, ext): (interest::ext::TimestampType, bool) = + eodec.read(&mut *reader)?; + ext_tstamp = Some(t); + has_ext = ext; + } + declare::ext::NodeId::ID => { + let (nid, ext): (interest::ext::NodeIdType, bool) = eodec.read(&mut *reader)?; + ext_nodeid = nid; + has_ext = ext; + } + _ => { + has_ext = extension::skip(reader, "Declare", ext)?; + } + } + } + + Ok(Interest { + id, + mode, + options, + wire_expr, + ext_qos, + ext_tstamp, + ext_nodeid, + }) + } +} diff --git a/commons/zenoh-codec/src/network/mod.rs b/commons/zenoh-codec/src/network/mod.rs index 3a227cd42a..5ebdb17b8e 100644 --- a/commons/zenoh-codec/src/network/mod.rs +++ b/commons/zenoh-codec/src/network/mod.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // mod declare; +mod interest; mod oam; mod push; mod request; @@ -45,6 +46,7 @@ where NetworkBody::Request(b) => self.write(&mut *writer, b), NetworkBody::Response(b) => self.write(&mut *writer, b), NetworkBody::ResponseFinal(b) => self.write(&mut *writer, b), + NetworkBody::Interest(b) => self.write(&mut *writer, b), NetworkBody::Declare(b) => self.write(&mut *writer, b), NetworkBody::OAM(b) => self.write(&mut *writer, b), } @@ -89,6 +91,7 @@ where id::REQUEST => NetworkBody::Request(self.read(&mut *reader)?), id::RESPONSE => NetworkBody::Response(self.read(&mut *reader)?), id::RESPONSE_FINAL => NetworkBody::ResponseFinal(self.read(&mut *reader)?), + id::INTEREST => NetworkBody::Interest(self.read(&mut *reader)?), id::DECLARE => NetworkBody::Declare(self.read(&mut *reader)?), id::OAM => NetworkBody::OAM(self.read(&mut *reader)?), _ => return Err(DidntRead), diff --git a/commons/zenoh-protocol/src/network/declare.rs b/commons/zenoh-protocol/src/network/declare.rs index 31e8adcc6e..9a41f42e56 100644 --- a/commons/zenoh-protocol/src/network/declare.rs +++ b/commons/zenoh-protocol/src/network/declare.rs @@ -19,8 +19,6 @@ use crate::{ }; use alloc::borrow::Cow; pub use common::*; -use core::sync::atomic::AtomicU32; -pub use interest::*; pub use keyexpr::*; pub use queryable::*; pub use subscriber::*; @@ -33,59 +31,24 @@ pub mod flag { } /// Flags: -/// - |: Mode The mode of the the declaration* -/// -/ +/// - I: Interest If I==1 then interest_id is present +/// - X: Reserved /// - Z: Extension If Z==1 then at least one extension is present /// /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ -/// |Z|Mod| DECLARE | +/// |Z|X|I| DECLARE | /// +-+-+-+---------+ -/// ~ rid:z32 ~ if Mode != Push +/// ~interest_id:z32~ if I==1 /// +---------------+ /// ~ [decl_exts] ~ if Z==1 /// +---------------+ /// ~ declaration ~ /// +---------------+ /// -/// *Mode of declaration: -/// - Mode 0b00: Push -/// - Mode 0b01: Response -/// - Mode 0b10: Request -/// - Mode 0b11: RequestContinuous - -/// The resolution of a RequestId -pub type DeclareRequestId = u32; -pub type AtomicDeclareRequestId = AtomicU32; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum DeclareMode { - Push, - Request(DeclareRequestId), - RequestContinuous(DeclareRequestId), - Response(DeclareRequestId), -} - -impl DeclareMode { - #[cfg(feature = "test")] - pub fn rand() -> Self { - use rand::Rng; - - let mut rng = rand::thread_rng(); - - match rng.gen_range(0..4) { - 0 => DeclareMode::Push, - 1 => DeclareMode::Request(rng.gen()), - 2 => DeclareMode::RequestContinuous(rng.gen()), - 3 => DeclareMode::Response(rng.gen()), - _ => unreachable!(), - } - } -} - #[derive(Debug, Clone, PartialEq, Eq)] pub struct Declare { - pub mode: DeclareMode, + pub interest_id: Option, pub ext_qos: ext::QoSType, pub ext_tstamp: Option, pub ext_nodeid: ext::NodeIdType, @@ -121,8 +84,6 @@ pub mod id { pub const D_TOKEN: u8 = 0x06; pub const U_TOKEN: u8 = 0x07; - pub const D_INTEREST: u8 = 0x08; - pub const D_FINAL: u8 = 0x1A; } @@ -136,7 +97,6 @@ pub enum DeclareBody { UndeclareQueryable(UndeclareQueryable), DeclareToken(DeclareToken), UndeclareToken(UndeclareToken), - DeclareInterest(DeclareInterest), DeclareFinal(DeclareFinal), } @@ -147,7 +107,7 @@ impl DeclareBody { let mut rng = rand::thread_rng(); - match rng.gen_range(0..10) { + match rng.gen_range(0..9) { 0 => DeclareBody::DeclareKeyExpr(DeclareKeyExpr::rand()), 1 => DeclareBody::UndeclareKeyExpr(UndeclareKeyExpr::rand()), 2 => DeclareBody::DeclareSubscriber(DeclareSubscriber::rand()), @@ -156,8 +116,7 @@ impl DeclareBody { 5 => DeclareBody::UndeclareQueryable(UndeclareQueryable::rand()), 6 => DeclareBody::DeclareToken(DeclareToken::rand()), 7 => DeclareBody::UndeclareToken(UndeclareToken::rand()), - 8 => DeclareBody::DeclareInterest(DeclareInterest::rand()), - 9 => DeclareBody::DeclareFinal(DeclareFinal::rand()), + 8 => DeclareBody::DeclareFinal(DeclareFinal::rand()), _ => unreachable!(), } } @@ -170,14 +129,16 @@ impl Declare { let mut rng = rand::thread_rng(); - let mode = DeclareMode::rand(); + let interest_id = rng + .gen_bool(0.5) + .then_some(rng.gen::()); let ext_qos = ext::QoSType::rand(); let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand); let ext_nodeid = ext::NodeIdType::rand(); let body = DeclareBody::rand(); Self { - mode, + interest_id, ext_qos, ext_tstamp, ext_nodeid, @@ -197,7 +158,7 @@ pub mod common { /// /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ - /// |Z|x|x| D_FINAL | + /// |Z|X|X| D_FINAL | /// +---------------+ /// ~ [final_exts] ~ if Z==1 /// +---------------+ @@ -714,319 +675,3 @@ pub mod token { } } } - -pub mod interest { - use core::{ - fmt::{self, Debug}, - ops::{Add, AddAssign, Sub, SubAssign}, - }; - - use super::*; - - pub type InterestId = u32; - - pub mod flag { - // pub const X: u8 = 1 << 5; // 0x20 Reserved - // pub const X: u8 = 1 << 6; // 0x40 Reserved - pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow - } - - /// # DeclareInterest message - /// - /// The DECLARE INTEREST message is sent to request the transmission of current and optionally future - /// declarations of a given kind matching a target keyexpr. E.g., a declare interest could be - /// sent to request the transmisison of all current subscriptions matching `a/*`. - /// - /// The behaviour of a DECLARE INTEREST depends on the DECLARE MODE in the DECLARE MESSAGE: - /// - Push: invalid - /// - Request: only current declarations - /// - RequestContinous: current and future declarations - /// - Response: invalid - /// - /// E.g., the [`DeclareInterest`] message flow is the following for a Request: - /// - /// ```text - /// A B - /// | DECL INTEREST | - /// |------------------>| -- Sent in Declare::Request. - /// | | This is a DeclareInterest e.g. for subscriber declarations. - /// | | - /// | DECL SUBSCRIBER | - /// |<------------------| -- Sent in Declare::Response - /// | DECL SUBSCRIBER | - /// |<------------------| -- Sent in Declare::Response - /// | DECL SUBSCRIBER | - /// |<------------------| -- Sent in Declare::Response - /// | | - /// | FINAL | - /// |<------------------| -- Sent in Declare::Response - /// ``` - /// - /// - /// And the [`DeclareInterest`] message flow is the following for a RequestContinuous: - /// - /// ```text - /// A B - /// | DECL INTEREST | - /// |------------------>| -- Sent in Declare::RequestContinuous. - /// | | This is a DeclareInterest e.g. for subscriber declarations/undeclarations. - /// | | - /// | DECL SUBSCRIBER | - /// |<------------------| -- Sent in Declare::Push - /// | DECL SUBSCRIBER | - /// |<------------------| -- Sent in Declare::Push - /// | DECL SUBSCRIBER | - /// |<------------------| -- Sent in Declare::Push - /// | | - /// | FINAL | - /// |<------------------| -- Sent in Declare::Response - /// | | - /// | DECL SUBSCRIBER | - /// |<------------------| -- Sent in Declare::Push. This is a new subscriber declaration. - /// | UNDECL SUBSCRIBER | - /// |<------------------| -- Sent in Declare::Push. This is a new subscriber undeclaration. - /// | | - /// | ... | - /// | | - /// | FINAL | - /// |------------------>| -- Sent in Declare::RequestContinuous. - /// | | This stops the transmission of subscriber declarations/undeclarations. - /// | | - /// ``` - /// - /// The DECLARE INTEREST message structure is defined as follows: - /// - /// ```text - /// Flags: - /// - X: Reserved - /// - X: Reserved - /// - Z: Extension If Z==1 then at least one extension is present - /// - /// 7 6 5 4 3 2 1 0 - /// +-+-+-+-+-+-+-+-+ - /// |Z|X|X| D_INT | - /// +---------------+ - /// |A|M|N|R|T|Q|S|K| (*) - /// +---------------+ - /// ~ key_scope:z16 ~ if R==1 - /// +---------------+ - /// ~ key_suffix ~ if R==1 && N==1 -- - /// +---------------+ - /// ~ [decl_exts] ~ if Z==1 - /// +---------------+ - /// - /// (*) - if K==1 then the interest refers to key expressions - /// - if S==1 then the interest refers to subscribers - /// - if Q==1 then the interest refers to queryables - /// - if T==1 then the interest refers to tokens - /// - if R==1 then the interest is restricted to the matching key expression, else it is for all key expressions. - /// - if N==1 then the key expr has name/suffix. If R==0 then N should be set to 0. - /// - if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver. - /// If R==0 then M should be set to 0. - /// - if A==1 then the replies SHOULD be aggregated - /// ``` - #[derive(Debug, Clone, PartialEq, Eq)] - pub struct DeclareInterest { - pub interest: Interest, - pub wire_expr: Option>, - } - - impl DeclareInterest { - pub fn options(&self) -> u8 { - let mut interest = self.interest; - if let Some(we) = self.wire_expr.as_ref() { - interest += Interest::RESTRICTED; - if we.has_suffix() { - interest += Interest::NAMED; - } - if let Mapping::Sender = we.mapping { - interest += Interest::MAPPING; - } - } - interest.options - } - - #[cfg(feature = "test")] - pub fn rand() -> Self { - use rand::Rng; - let mut rng = rand::thread_rng(); - - let wire_expr = rng.gen_bool(0.5).then_some(WireExpr::rand()); - let interest = Interest::rand(); - - Self { - wire_expr, - interest, - } - } - } - - #[derive(Clone, Copy)] - pub struct Interest { - options: u8, - } - - impl Interest { - // Flags - pub const KEYEXPRS: Interest = Interest::options(1); - pub const SUBSCRIBERS: Interest = Interest::options(1 << 1); - pub const QUERYABLES: Interest = Interest::options(1 << 2); - pub const TOKENS: Interest = Interest::options(1 << 3); - const RESTRICTED: Interest = Interest::options(1 << 4); - const NAMED: Interest = Interest::options(1 << 5); - const MAPPING: Interest = Interest::options(1 << 6); - pub const AGGREGATE: Interest = Interest::options(1 << 7); - pub const ALL: Interest = Interest::options( - Interest::KEYEXPRS.options - | Interest::SUBSCRIBERS.options - | Interest::QUERYABLES.options - | Interest::TOKENS.options, - ); - - const fn options(options: u8) -> Self { - Self { options } - } - - pub const fn empty() -> Self { - Self { options: 0 } - } - - pub const fn keyexprs(&self) -> bool { - imsg::has_flag(self.options, Self::KEYEXPRS.options) - } - - pub const fn subscribers(&self) -> bool { - imsg::has_flag(self.options, Self::SUBSCRIBERS.options) - } - - pub const fn queryables(&self) -> bool { - imsg::has_flag(self.options, Self::QUERYABLES.options) - } - - pub const fn tokens(&self) -> bool { - imsg::has_flag(self.options, Self::TOKENS.options) - } - - pub const fn restricted(&self) -> bool { - imsg::has_flag(self.options, Self::RESTRICTED.options) - } - - pub const fn named(&self) -> bool { - imsg::has_flag(self.options, Self::NAMED.options) - } - - pub const fn mapping(&self) -> bool { - imsg::has_flag(self.options, Self::MAPPING.options) - } - - pub const fn aggregate(&self) -> bool { - imsg::has_flag(self.options, Self::AGGREGATE.options) - } - - #[cfg(feature = "test")] - pub fn rand() -> Self { - use rand::Rng; - let mut rng = rand::thread_rng(); - - let mut s = Self::empty(); - if rng.gen_bool(0.5) { - s += Interest::KEYEXPRS; - } - if rng.gen_bool(0.5) { - s += Interest::SUBSCRIBERS; - } - if rng.gen_bool(0.5) { - s += Interest::TOKENS; - } - if rng.gen_bool(0.5) { - s += Interest::AGGREGATE; - } - s - } - } - - impl PartialEq for Interest { - fn eq(&self, other: &Self) -> bool { - self.keyexprs() == other.keyexprs() - && self.subscribers() == other.subscribers() - && self.queryables() == other.queryables() - && self.tokens() == other.tokens() - && self.aggregate() == other.aggregate() - } - } - - impl Debug for Interest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Interest {{ ")?; - if self.keyexprs() { - write!(f, "K:Y, ")?; - } else { - write!(f, "K:N, ")?; - } - if self.subscribers() { - write!(f, "S:Y, ")?; - } else { - write!(f, "S:N, ")?; - } - if self.queryables() { - write!(f, "Q:Y, ")?; - } else { - write!(f, "Q:N, ")?; - } - if self.tokens() { - write!(f, "T:Y, ")?; - } else { - write!(f, "T:N, ")?; - } - if self.aggregate() { - write!(f, "A:Y")?; - } else { - write!(f, "A:N")?; - } - write!(f, " }}")?; - Ok(()) - } - } - - impl Eq for Interest {} - - impl Add for Interest { - type Output = Self; - - #[allow(clippy::suspicious_arithmetic_impl)] // Allows to implement Add & Sub for Interest - fn add(self, rhs: Self) -> Self::Output { - Self { - options: self.options | rhs.options, - } - } - } - - impl AddAssign for Interest { - #[allow(clippy::suspicious_op_assign_impl)] // Allows to implement Add & Sub for Interest - fn add_assign(&mut self, rhs: Self) { - self.options |= rhs.options; - } - } - - impl Sub for Interest { - type Output = Self; - - fn sub(self, rhs: Self) -> Self::Output { - Self { - options: self.options & !rhs.options, - } - } - } - - impl SubAssign for Interest { - fn sub_assign(&mut self, rhs: Self) { - self.options &= !rhs.options; - } - } - - impl From for Interest { - fn from(options: u8) -> Self { - Self { options } - } - } -} diff --git a/commons/zenoh-protocol/src/network/interest.rs b/commons/zenoh-protocol/src/network/interest.rs new file mode 100644 index 0000000000..e7eb75787e --- /dev/null +++ b/commons/zenoh-protocol/src/network/interest.rs @@ -0,0 +1,383 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use crate::{common::imsg, core::WireExpr, network::Mapping}; +use core::{ + fmt::{self, Debug}, + ops::{Add, AddAssign, Sub, SubAssign}, + sync::atomic::AtomicU32, +}; + +pub type InterestId = u32; + +pub mod flag { + pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow +} + +/// The INTEREST message is sent to request the transmission of current and optionally future +/// declarations of a given kind matching a target keyexpr. E.g., an interest could be +/// sent to request the transmisison of all current subscriptions matching `a/*`. +/// +/// The behaviour of a INTEREST depends on the INTEREST MODE. +/// +/// E.g., the message flow is the following for an [`Interest`] with mode `Current`: +/// +/// ```text +/// A B +/// | INTEREST | +/// |------------------>| -- Mode: Current +/// | | This is an Interest e.g. for subscriber declarations. +/// | | +/// | DECL SUBSCRIBER | +/// |<------------------| -- With interest_id field set +/// | DECL SUBSCRIBER | +/// |<------------------| -- With interest_id field set +/// | DECL SUBSCRIBER | +/// |<------------------| -- With interest_id field set +/// | | +/// | DECL FINAL | +/// |<------------------| -- With interest_id field set +/// | | +/// ``` +/// +/// And the message flow is the following for an [`Interest`] with mode `CurrentFuture`: +/// +/// ```text +/// A B +/// | INTEREST | +/// |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations. +/// | | +/// | DECL SUBSCRIBER | +/// |<------------------| -- With interest_id field not set +/// | DECL SUBSCRIBER | +/// |<------------------| -- With interest_id field not set +/// | DECL SUBSCRIBER | +/// |<------------------| -- With interest_id field not set +/// | | +/// | DECL FINAL | +/// |<------------------| -- With interest_id field set +/// | | +/// | DECL SUBSCRIBER | +/// |<------------------| -- With interest_id field not set +/// | UNDECL SUBSCRIBER | +/// |<------------------| -- With interest_id field not set +/// | | +/// | ... | +/// | | +/// | INTEREST FINAL | +/// |------------------>| -- Mode: Final +/// | | This stops the transmission of subscriber declarations/undeclarations. +/// | | +/// +/// Flags: +/// - |: Mode The mode of the interest* +/// -/ +/// - Z: Extension If Z==1 then at least one extension is present +/// +/// 7 6 5 4 3 2 1 0 +/// +-+-+-+-+-+-+-+-+ +/// |Z|Mod|INTEREST | +/// +-+-+-+---------+ +/// ~ id:z32 ~ +/// +---------------+ +/// |A|M|N|R|T|Q|S|K| if Mod!=Final (*) +/// +---------------+ +/// ~ key_scope:z16 ~ if Mod!=Final && R==1 +/// +---------------+ +/// ~ key_suffix ~ if Mod!=Final && R==1 && N==1 -- +/// +---------------+ +/// ~ [int_exts] ~ if Z==1 +/// +---------------+ +/// +/// *Mode of declaration: +/// - Mode 0b00: Final +/// - Mode 0b01: Current +/// - Mode 0b10: Future +/// - Mode 0b11: CurrentFuture +/// +/// (*) - if K==1 then the interest refers to key expressions +/// - if S==1 then the interest refers to subscribers +/// - if Q==1 then the interest refers to queryables +/// - if T==1 then the interest refers to tokens +/// - if R==1 then the interest is restricted to the matching key expression, else it is for all key expressions. +/// - if N==1 then the key expr has name/suffix. If R==0 then N should be set to 0. +/// - if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver. +/// If R==0 then M should be set to 0. +/// - if A==1 then the replies SHOULD be aggregated +/// ``` + +/// The resolution of a RequestId +pub type DeclareRequestId = u32; +pub type AtomicDeclareRequestId = AtomicU32; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum InterestMode { + Final, + Current, + Future, + CurrentFuture, +} + +impl InterestMode { + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + + let mut rng = rand::thread_rng(); + + match rng.gen_range(0..4) { + 0 => InterestMode::Final, + 1 => InterestMode::Current, + 2 => InterestMode::Future, + 3 => InterestMode::CurrentFuture, + _ => unreachable!(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Interest { + pub id: InterestId, + pub mode: InterestMode, + pub options: InterestOptions, + pub wire_expr: Option>, + pub ext_qos: ext::QoSType, + pub ext_tstamp: Option, + pub ext_nodeid: ext::NodeIdType, +} + +pub mod ext { + use crate::{ + common::{ZExtZ64, ZExtZBuf}, + zextz64, zextzbuf, + }; + + pub type QoS = zextz64!(0x1, false); + pub type QoSType = crate::network::ext::QoSType<{ QoS::ID }>; + + pub type Timestamp = zextzbuf!(0x2, false); + pub type TimestampType = crate::network::ext::TimestampType<{ Timestamp::ID }>; + + pub type NodeId = zextz64!(0x3, true); + pub type NodeIdType = crate::network::ext::NodeIdType<{ NodeId::ID }>; +} + +impl Interest { + pub fn options(&self) -> u8 { + let mut interest = self.options; + if let Some(we) = self.wire_expr.as_ref() { + interest += InterestOptions::RESTRICTED; + if we.has_suffix() { + interest += InterestOptions::NAMED; + } + if let Mapping::Sender = we.mapping { + interest += InterestOptions::MAPPING; + } + } + interest.options + } + + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + + let id = rng.gen::(); + let mode = InterestMode::rand(); + let options = InterestOptions::rand(); + let wire_expr = rng.gen_bool(0.5).then_some(WireExpr::rand()); + let ext_qos = ext::QoSType::rand(); + let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand); + let ext_nodeid = ext::NodeIdType::rand(); + + Self { + id, + mode, + wire_expr, + options, + ext_qos, + ext_tstamp, + ext_nodeid, + } + } +} + +#[repr(transparent)] +#[derive(Clone, Copy)] +pub struct InterestOptions { + options: u8, +} + +impl InterestOptions { + // Flags + pub const KEYEXPRS: InterestOptions = InterestOptions::options(1); + pub const SUBSCRIBERS: InterestOptions = InterestOptions::options(1 << 1); + pub const QUERYABLES: InterestOptions = InterestOptions::options(1 << 2); + pub const TOKENS: InterestOptions = InterestOptions::options(1 << 3); + const RESTRICTED: InterestOptions = InterestOptions::options(1 << 4); + const NAMED: InterestOptions = InterestOptions::options(1 << 5); + const MAPPING: InterestOptions = InterestOptions::options(1 << 6); + pub const AGGREGATE: InterestOptions = InterestOptions::options(1 << 7); + pub const ALL: InterestOptions = InterestOptions::options( + InterestOptions::KEYEXPRS.options + | InterestOptions::SUBSCRIBERS.options + | InterestOptions::QUERYABLES.options + | InterestOptions::TOKENS.options, + ); + + const fn options(options: u8) -> Self { + Self { options } + } + + pub const fn empty() -> Self { + Self { options: 0 } + } + + pub const fn keyexprs(&self) -> bool { + imsg::has_flag(self.options, Self::KEYEXPRS.options) + } + + pub const fn subscribers(&self) -> bool { + imsg::has_flag(self.options, Self::SUBSCRIBERS.options) + } + + pub const fn queryables(&self) -> bool { + imsg::has_flag(self.options, Self::QUERYABLES.options) + } + + pub const fn tokens(&self) -> bool { + imsg::has_flag(self.options, Self::TOKENS.options) + } + + pub const fn restricted(&self) -> bool { + imsg::has_flag(self.options, Self::RESTRICTED.options) + } + + pub const fn named(&self) -> bool { + imsg::has_flag(self.options, Self::NAMED.options) + } + + pub const fn mapping(&self) -> bool { + imsg::has_flag(self.options, Self::MAPPING.options) + } + + pub const fn aggregate(&self) -> bool { + imsg::has_flag(self.options, Self::AGGREGATE.options) + } + + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + + let mut s = Self::empty(); + if rng.gen_bool(0.5) { + s += InterestOptions::KEYEXPRS; + } + if rng.gen_bool(0.5) { + s += InterestOptions::SUBSCRIBERS; + } + if rng.gen_bool(0.5) { + s += InterestOptions::TOKENS; + } + if rng.gen_bool(0.5) { + s += InterestOptions::AGGREGATE; + } + s + } +} + +impl PartialEq for InterestOptions { + fn eq(&self, other: &Self) -> bool { + self.keyexprs() == other.keyexprs() + && self.subscribers() == other.subscribers() + && self.queryables() == other.queryables() + && self.tokens() == other.tokens() + && self.aggregate() == other.aggregate() + } +} + +impl Debug for InterestOptions { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Interest {{ ")?; + if self.keyexprs() { + write!(f, "K:Y, ")?; + } else { + write!(f, "K:N, ")?; + } + if self.subscribers() { + write!(f, "S:Y, ")?; + } else { + write!(f, "S:N, ")?; + } + if self.queryables() { + write!(f, "Q:Y, ")?; + } else { + write!(f, "Q:N, ")?; + } + if self.tokens() { + write!(f, "T:Y, ")?; + } else { + write!(f, "T:N, ")?; + } + if self.aggregate() { + write!(f, "A:Y")?; + } else { + write!(f, "A:N")?; + } + write!(f, " }}")?; + Ok(()) + } +} + +impl Eq for InterestOptions {} + +impl Add for InterestOptions { + type Output = Self; + + #[allow(clippy::suspicious_arithmetic_impl)] // Allows to implement Add & Sub for Interest + fn add(self, rhs: Self) -> Self::Output { + Self { + options: self.options | rhs.options, + } + } +} + +impl AddAssign for InterestOptions { + #[allow(clippy::suspicious_op_assign_impl)] // Allows to implement Add & Sub for Interest + fn add_assign(&mut self, rhs: Self) { + self.options |= rhs.options; + } +} + +impl Sub for InterestOptions { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + Self { + options: self.options & !rhs.options, + } + } +} + +impl SubAssign for InterestOptions { + fn sub_assign(&mut self, rhs: Self) { + self.options &= !rhs.options; + } +} + +impl From for InterestOptions { + fn from(options: u8) -> Self { + Self { options } + } +} diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index e60388f425..5a0635c9e0 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // pub mod declare; +pub mod interest; pub mod oam; pub mod push; pub mod request; @@ -20,10 +21,10 @@ pub mod response; use core::fmt; pub use declare::{ - Declare, DeclareBody, DeclareFinal, DeclareInterest, DeclareKeyExpr, DeclareMode, - DeclareQueryable, DeclareSubscriber, DeclareToken, UndeclareKeyExpr, UndeclareQueryable, - UndeclareSubscriber, UndeclareToken, + Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber, + DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken, }; +pub use interest::Interest; pub use oam::Oam; pub use push::Push; pub use request::{AtomicRequestId, Request, RequestId}; @@ -40,6 +41,7 @@ pub mod id { pub const REQUEST: u8 = 0x1c; pub const RESPONSE: u8 = 0x1b; pub const RESPONSE_FINAL: u8 = 0x1a; + pub const INTEREST: u8 = 0x19; } #[repr(u8)] @@ -73,6 +75,7 @@ pub enum NetworkBody { Request(Request), Response(Response), ResponseFinal(ResponseFinal), + Interest(Interest), Declare(Declare), OAM(Oam), } @@ -117,6 +120,7 @@ impl NetworkMessage { NetworkBody::Request(msg) => msg.ext_qos.is_express(), NetworkBody::Response(msg) => msg.ext_qos.is_express(), NetworkBody::ResponseFinal(msg) => msg.ext_qos.is_express(), + NetworkBody::Interest(msg) => msg.ext_qos.is_express(), NetworkBody::Declare(msg) => msg.ext_qos.is_express(), NetworkBody::OAM(msg) => msg.ext_qos.is_express(), } @@ -133,6 +137,7 @@ impl NetworkMessage { NetworkBody::Request(msg) => msg.ext_qos.get_congestion_control(), NetworkBody::Response(msg) => msg.ext_qos.get_congestion_control(), NetworkBody::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(), + NetworkBody::Interest(msg) => msg.ext_qos.get_congestion_control(), NetworkBody::Declare(msg) => msg.ext_qos.get_congestion_control(), NetworkBody::OAM(msg) => msg.ext_qos.get_congestion_control(), }; @@ -147,6 +152,7 @@ impl NetworkMessage { NetworkBody::Request(msg) => msg.ext_qos.get_priority(), NetworkBody::Response(msg) => msg.ext_qos.get_priority(), NetworkBody::ResponseFinal(msg) => msg.ext_qos.get_priority(), + NetworkBody::Interest(msg) => msg.ext_qos.get_priority(), NetworkBody::Declare(msg) => msg.ext_qos.get_priority(), NetworkBody::OAM(msg) => msg.ext_qos.get_priority(), } @@ -162,6 +168,7 @@ impl fmt::Display for NetworkMessage { Request(_) => write!(f, "Request"), Response(_) => write!(f, "Response"), ResponseFinal(_) => write!(f, "ResponseFinal"), + Interest(_) => write!(f, "Interest"), Declare(_) => write!(f, "Declare"), } } diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index bf569d0345..09edde884e 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -145,7 +145,10 @@ pub fn map_zmsg_to_shminfo(msg: &mut NetworkMessage) -> ZResult { ResponseBody::Reply(b) => b.map_to_shminfo(), ResponseBody::Err(b) => b.map_to_shminfo(), }, - NetworkBody::ResponseFinal(_) | NetworkBody::Declare(_) | NetworkBody::OAM(_) => Ok(false), + NetworkBody::ResponseFinal(_) + | NetworkBody::Interest(_) + | NetworkBody::Declare(_) + | NetworkBody::OAM(_) => Ok(false), } } @@ -196,7 +199,10 @@ pub fn map_zmsg_to_shmbuf( ResponseBody::Reply(b) => b.map_to_shmbuf(shmr), ResponseBody::Err(b) => b.map_to_shmbuf(shmr), }, - NetworkBody::ResponseFinal(_) | NetworkBody::Declare(_) | NetworkBody::OAM(_) => Ok(false), + NetworkBody::ResponseFinal(_) + | NetworkBody::Interest(_) + | NetworkBody::Declare(_) + | NetworkBody::OAM(_) => Ok(false), } } diff --git a/zenoh/src/key_expr.rs b/zenoh/src/key_expr.rs index 1e8da2c3c9..d2bfb5bcfe 100644 --- a/zenoh/src/key_expr.rs +++ b/zenoh/src/key_expr.rs @@ -53,7 +53,7 @@ pub use zenoh_keyexpr::*; pub use zenoh_macros::{kedefine, keformat, kewrite}; use zenoh_protocol::{ core::{key_expr::canon::Canonizable, ExprId, WireExpr}, - network::{declare, DeclareBody, DeclareMode, Mapping, UndeclareKeyExpr}, + network::{declare, DeclareBody, Mapping, UndeclareKeyExpr}, }; use zenoh_result::ZResult; @@ -664,7 +664,7 @@ impl SyncResolve for KeyExprUndeclaration<'_> { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(zenoh_protocol::network::Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/primitives/demux.rs b/zenoh/src/net/primitives/demux.rs index d62e410c81..e58e01a1b5 100644 --- a/zenoh/src/net/primitives/demux.rs +++ b/zenoh/src/net/primitives/demux.rs @@ -67,6 +67,7 @@ impl TransportPeerEventHandler for DeMux { match msg.body { NetworkBody::Push(m) => self.face.send_push(m), NetworkBody::Declare(m) => self.face.send_declare(m), + NetworkBody::Interest(_) => todo!(), NetworkBody::Request(m) => self.face.send_request(m), NetworkBody::Response(m) => self.face.send_response(m), NetworkBody::ResponseFinal(m) => self.face.send_response_final(m), diff --git a/zenoh/src/net/primitives/mod.rs b/zenoh/src/net/primitives/mod.rs index fd85280be0..d3aa8097ca 100644 --- a/zenoh/src/net/primitives/mod.rs +++ b/zenoh/src/net/primitives/mod.rs @@ -18,11 +18,15 @@ use std::any::Any; pub use demux::*; pub use mux::*; -use zenoh_protocol::network::{Declare, Push, Request, Response, ResponseFinal}; +use zenoh_protocol::network::{ + interest::Interest, Declare, Push, Request, Response, ResponseFinal, +}; use super::routing::RoutingContext; pub trait Primitives: Send + Sync { + fn send_interest(&self, msg: Interest); + fn send_declare(&self, msg: Declare); fn send_push(&self, msg: Push); @@ -56,6 +60,8 @@ pub(crate) trait EPrimitives: Send + Sync { pub struct DummyPrimitives; impl Primitives for DummyPrimitives { + fn send_interest(&self, _msg: Interest) {} + fn send_declare(&self, _msg: Declare) {} fn send_push(&self, _msg: Push) {} diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index 5c473e8ad8..ccb2452f30 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -19,7 +19,8 @@ use crate::net::routing::{ }; use std::sync::OnceLock; use zenoh_protocol::network::{ - Declare, NetworkBody, NetworkMessage, Push, Request, Response, ResponseFinal, + interest::Interest, Declare, NetworkBody, NetworkMessage, Push, Request, Response, + ResponseFinal, }; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; @@ -40,6 +41,34 @@ impl Mux { } impl Primitives for Mux { + fn send_interest(&self, msg: Interest) { + let msg = NetworkMessage { + body: NetworkBody::Interest(msg), + #[cfg(feature = "stats")] + size: None, + }; + if self.interceptor.interceptors.is_empty() { + let _ = self.handler.schedule(msg); + } else if let Some(face) = self.face.get() { + let Some(face) = face.upgrade() else { + log::debug!("Invalid face: {:?}. Interest not sent: {:?}", face, msg); + return; + }; + let ctx = RoutingContext::new_out(msg, face.clone()); + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { + let _ = self.handler.schedule(ctx.msg); + } + } else { + log::debug!("Uninitialized multiplexer. Interest not sent: {:?}", msg); + } + } + fn send_declare(&self, msg: Declare) { let msg = NetworkMessage { body: NetworkBody::Declare(msg), @@ -316,6 +345,30 @@ impl McastMux { } impl Primitives for McastMux { + fn send_interest(&self, msg: Interest) { + let msg = NetworkMessage { + body: NetworkBody::Interest(msg), + #[cfg(feature = "stats")] + size: None, + }; + if self.interceptor.interceptors.is_empty() { + let _ = self.handler.schedule(msg); + } else if let Some(face) = self.face.get() { + let ctx = RoutingContext::new_out(msg, face.clone()); + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { + let _ = self.handler.schedule(ctx.msg); + } + } else { + log::error!("Uninitialized multiplexer!"); + } + } + fn send_declare(&self, msg: Declare) { let msg = NetworkMessage { body: NetworkBody::Declare(msg), diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 371edee57b..29c3f0da2f 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -154,7 +154,7 @@ impl fmt::Display for FaceState { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct WeakFace { pub(crate) tables: Weak, pub(crate) state: Weak, @@ -185,6 +185,10 @@ impl Face { } impl Primitives for Face { + fn send_interest(&self, _msg: zenoh_protocol::network::Interest) { + todo!() + } + fn send_declare(&self, msg: zenoh_protocol::network::Declare) { let ctrl_lock = zlock!(self.tables.ctrl_lock); match msg.body { @@ -238,8 +242,7 @@ impl Primitives for Face { } zenoh_protocol::network::DeclareBody::DeclareToken(_m) => todo!(), zenoh_protocol::network::DeclareBody::UndeclareToken(_m) => todo!(), - zenoh_protocol::network::DeclareBody::DeclareInterest(_m) => todo!(), - zenoh_protocol::network::DeclareBody::DeclareFinal(_m) => todo!(), + zenoh_protocol::network::DeclareBody::DeclareFinal(_) => todo!(), } drop(ctrl_lock); } diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 58a081d743..62193cdf93 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -27,7 +27,7 @@ use zenoh_protocol::{ network::{ declare::{ ext, queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, Declare, - DeclareBody, DeclareKeyExpr, DeclareMode, + DeclareBody, DeclareKeyExpr, }, Mapping, }, @@ -465,7 +465,7 @@ impl Resource { .insert(expr_id, nonwild_prefix.clone()); face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 6c689d3336..e85bb77bf9 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -30,7 +30,7 @@ use zenoh_protocol::{ core::{Reliability, WhatAmI}, network::declare::{ common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareMode, DeclareSubscriber, UndeclareSubscriber, + DeclareSubscriber, UndeclareSubscriber, }, }; use zenoh_sync::get_mut_unchecked; @@ -53,7 +53,7 @@ fn propagate_simple_subscription_to( let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -137,7 +137,7 @@ fn declare_client_subscription( .primitives .send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -171,7 +171,7 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -206,7 +206,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 28e1d75460..5c0bc5349b 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -33,7 +33,7 @@ use zenoh_protocol::{ core::{WhatAmI, WireExpr}, network::declare::{ common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareMode, DeclareQueryable, UndeclareQueryable, + DeclareQueryable, UndeclareQueryable, }, }; use zenoh_sync::get_mut_unchecked; @@ -93,7 +93,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -165,7 +165,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -418,7 +418,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -460,7 +460,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 356793e3a3..150c12a632 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -36,7 +36,7 @@ use zenoh_protocol::{ core::{WhatAmI, WireExpr, ZenohId}, network::declare::{ common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareMode, DeclareQueryable, UndeclareQueryable, + DeclareQueryable, UndeclareQueryable, }, }; use zenoh_sync::get_mut_unchecked; @@ -126,7 +126,7 @@ fn send_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -170,7 +170,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -339,7 +339,7 @@ fn send_forget_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -365,7 +365,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 5ac0b22846..b495248788 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -30,7 +30,7 @@ use zenoh_protocol::{ core::{Reliability, WhatAmI}, network::declare::{ common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareMode, DeclareSubscriber, UndeclareSubscriber, + DeclareSubscriber, UndeclareSubscriber, }, }; use zenoh_sync::get_mut_unchecked; @@ -53,7 +53,7 @@ fn propagate_simple_subscription_to( let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -137,7 +137,7 @@ fn declare_client_subscription( .primitives .send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -171,7 +171,7 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -206,7 +206,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index c2d62c7658..72c32b9217 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -33,7 +33,7 @@ use zenoh_protocol::{ core::{WhatAmI, WireExpr}, network::declare::{ common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareMode, DeclareQueryable, UndeclareQueryable, + DeclareQueryable, UndeclareQueryable, }, }; use zenoh_sync::get_mut_unchecked; @@ -93,7 +93,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -165,7 +165,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -412,7 +412,7 @@ fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc< if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -564,7 +564,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -606,7 +606,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -635,7 +635,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -774,7 +774,7 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: if forget { dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -800,7 +800,7 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: }; dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index e647cf2dc7..99e787beb5 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -36,7 +36,7 @@ use zenoh_protocol::{ core::{WhatAmI, WireExpr, ZenohId}, network::declare::{ common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareMode, DeclareQueryable, UndeclareQueryable, + DeclareQueryable, UndeclareQueryable, }, }; use zenoh_sync::get_mut_unchecked; @@ -194,7 +194,7 @@ fn send_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -248,7 +248,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -473,7 +473,7 @@ fn send_forget_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -499,7 +499,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -775,7 +775,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -874,7 +874,7 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links if forget { dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -900,7 +900,7 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/mod.rs b/zenoh/src/net/routing/mod.rs index 77f51c16b3..75b4d4ef6a 100644 --- a/zenoh/src/net/routing/mod.rs +++ b/zenoh/src/net/routing/mod.rs @@ -107,6 +107,7 @@ impl RoutingContext { NetworkBody::Request(m) => Some(&m.wire_expr), NetworkBody::Response(m) => Some(&m.wire_expr), NetworkBody::ResponseFinal(_) => None, + NetworkBody::Interest(m) => m.wire_expr.as_ref(), NetworkBody::Declare(m) => match &m.body { DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr), DeclareBody::UndeclareKeyExpr(_) => None, @@ -116,7 +117,6 @@ impl RoutingContext { DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr), DeclareBody::DeclareToken(m) => Some(&m.wire_expr), DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr), - DeclareBody::DeclareInterest(m) => m.wire_expr.as_ref(), DeclareBody::DeclareFinal(_) => None, }, NetworkBody::OAM(_) => None, diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 5b5b41b390..78ece859c7 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -33,6 +33,7 @@ use zenoh_buffers::buffer::SplitBuffer; use zenoh_config::{ConfigValidator, ValidatedMap, WhatAmI}; use zenoh_plugin_trait::{PluginControl, PluginStatus}; use zenoh_protocol::network::declare::QueryableId; +use zenoh_protocol::network::Interest; use zenoh_protocol::{ core::{ key_expr::{keyexpr, OwnedKeyExpr}, @@ -40,8 +41,8 @@ use zenoh_protocol::{ }, network::{ declare::{queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo}, - ext, Declare, DeclareBody, DeclareMode, DeclareQueryable, DeclareSubscriber, Push, Request, - Response, ResponseFinal, + ext, Declare, DeclareBody, DeclareQueryable, DeclareSubscriber, Push, Request, Response, + ResponseFinal, }, zenoh::{PushBody, RequestBody}, }; @@ -277,7 +278,7 @@ impl AdminSpace { zlock!(admin.primitives).replace(primitives.clone()); primitives.send_declare(Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, @@ -290,7 +291,7 @@ impl AdminSpace { }); primitives.send_declare(Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -320,6 +321,10 @@ impl AdminSpace { } impl Primitives for AdminSpace { + fn send_interest(&self, msg: Interest) { + log::trace!("Recv interest {:?}", msg); + } + fn send_declare(&self, msg: Declare) { log::trace!("Recv declare {:?}", msg); if let DeclareBody::DeclareKeyExpr(m) = msg.body { diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 35db2a7ac4..841bc209f6 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -26,7 +26,7 @@ use zenoh_protocol::core::{ key_expr::keyexpr, ExprId, Reliability, WhatAmI, WireExpr, ZenohId, EMPTY_EXPR_ID, }; use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo; -use zenoh_protocol::network::{ext, Declare, DeclareBody, DeclareKeyExpr, DeclareMode}; +use zenoh_protocol::network::{ext, Declare, DeclareBody, DeclareKeyExpr}; use zenoh_protocol::zenoh::{PushBody, Put}; #[test] @@ -495,6 +495,8 @@ impl ClientPrimitives { } impl Primitives for ClientPrimitives { + fn send_interest(&self, _msg: zenoh_protocol::network::Interest) {} + fn send_declare(&self, msg: zenoh_protocol::network::Declare) { match msg.body { DeclareBody::DeclareKeyExpr(d) => { @@ -579,7 +581,7 @@ fn client_test() { Primitives::send_declare( primitives0.as_ref(), Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -607,7 +609,7 @@ fn client_test() { Primitives::send_declare( primitives0.as_ref(), Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -629,7 +631,7 @@ fn client_test() { Primitives::send_declare( primitives1.as_ref(), Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -657,7 +659,7 @@ fn client_test() { Primitives::send_declare( primitives1.as_ref(), Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -679,7 +681,7 @@ fn client_test() { Primitives::send_declare( primitives2.as_ref(), Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 3f1c382a66..29ad9c2b00 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -69,7 +69,7 @@ use zenoh_protocol::{ network::{ declare::{ self, common::ext::WireExprType, queryable::ext::QueryableInfoType, - subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareKeyExpr, DeclareMode, + subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber, UndeclareQueryable, UndeclareSubscriber, }, request::{self, ext::TargetType, Request}, @@ -893,7 +893,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1106,7 +1106,7 @@ impl Session { // }; primitives.send_declare(Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1163,7 +1163,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1215,7 +1215,7 @@ impl Session { distance: 0, }; primitives.send_declare(Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1237,7 +1237,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1273,7 +1273,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1298,7 +1298,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - mode: DeclareMode::Push, + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -1984,6 +1984,9 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { } impl Primitives for Session { + fn send_interest(&self, msg: zenoh_protocol::network::Interest) { + trace!("recv Interest {} {:?}", msg.id, msg.wire_expr); + } fn send_declare(&self, msg: zenoh_protocol::network::Declare) { match msg.body { zenoh_protocol::network::DeclareBody::DeclareKeyExpr(m) => { @@ -2086,7 +2089,6 @@ impl Primitives for Session { } DeclareBody::DeclareToken(_) => todo!(), DeclareBody::UndeclareToken(_) => todo!(), - DeclareBody::DeclareInterest(_) => todo!(), DeclareBody::DeclareFinal(_) => todo!(), } }