Skip to content

Commit

Permalink
SHM: optimize metadata (#1714)
Browse files Browse the repository at this point in the history
* Optimize SHM internals by combining watchdog and header data together into sigle "Metadata" structure

* Update segment.rs, subscription.rs, and 2 more files...

* fix typo
remove extra confirmation

* fix test feature code

* Update tests

* Code format fix (cargo -> rustfmt)

* - remove unnecssary trait impls
- change three_node_combination_multicast ports

* Put ProtocolID and ChunkDescriptor into SHM metadata: less wire overhead and less structure sizes for SHM

* code format fix

* review fixes

* no need to explicitly store ChunkDescriptor in BusyChunk as now it is stored in shared metadata

* SHM version support with backward-safety

* Update version.rs

* - fix is_valid() check implementation for SHM buffer
- get rid of pointers in OwnedMetadataDescriptor
- get rid of some "unsafe" in OwnedMetadataDescriptor

* Update commons/zenoh-shm/tests/metadata.rs

Co-authored-by: Luca Cominardi <[email protected]>

* Update commons/zenoh-shm/tests/metadata.rs

Co-authored-by: Luca Cominardi <[email protected]>

* Update io/zenoh-transport/src/unicast/establishment/ext/shm.rs

Co-authored-by: Luca Cominardi <[email protected]>

* more review fixes

* fix clippy

* fix uninitialized ref problem by getting rid of problematic lockfree crate

---------

Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
yellowhatter and Mallets authored Jan 29, 2025
1 parent 5343d10 commit 9ba08eb
Show file tree
Hide file tree
Showing 36 changed files with 927 additions and 1,041 deletions.
26 changes: 10 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ const_format = "0.2.33"
crc = "3.2.1"
criterion = "0.5"
crossbeam-utils = "0.8.20"
crossbeam-queue = "0.3.12"
derive_more = { version = "1.0.0", features = ["as_ref"] }
derive-new = "0.7.0"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
Expand All @@ -118,7 +119,6 @@ leb128 = "0.2"
libc = "0.2.158"
libloading = "0.8"
tracing = "0.1"
lockfree = "0.5"
lz4_flex = "0.11"
nix = { version = "0.29.0", features = ["fs"] }
num_cpus = "1.16.0"
Expand Down
67 changes: 10 additions & 57 deletions commons/zenoh-codec/src/core/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,18 @@ use zenoh_buffers::{
writer::{DidntWrite, Writer},
};
use zenoh_shm::{
api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor,
watchdog::descriptor::Descriptor, ShmBufInfo,
api::provider::chunk::ChunkDescriptor, metadata::descriptor::MetadataDescriptor, ShmBufInfo,
};

use crate::{RCodec, WCodec, Zenoh080};

impl<W> WCodec<&Descriptor, &mut W> for Zenoh080
impl<W> WCodec<&MetadataDescriptor, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &Descriptor) -> Self::Output {
self.write(&mut *writer, x.id)?;
self.write(&mut *writer, x.index_and_bitpos)?;
Ok(())
}
}

impl<W> WCodec<&HeaderDescriptor, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &HeaderDescriptor) -> Self::Output {
fn write(self, writer: &mut W, x: &MetadataDescriptor) -> Self::Output {
self.write(&mut *writer, x.id)?;
self.write(&mut *writer, x.index)?;
Ok(())
Expand Down Expand Up @@ -84,52 +70,29 @@ where

fn write(self, writer: &mut W, x: &ShmBufInfo) -> Self::Output {
let ShmBufInfo {
data_descriptor,
shm_protocol,
data_len,
watchdog_descriptor,
header_descriptor,
metadata,
generation,
} = x;

self.write(&mut *writer, data_descriptor)?;
self.write(&mut *writer, shm_protocol)?;
self.write(&mut *writer, *data_len)?;
self.write(&mut *writer, watchdog_descriptor)?;
self.write(&mut *writer, header_descriptor)?;
self.write(&mut *writer, metadata)?;
self.write(&mut *writer, generation)?;
Ok(())
}
}

impl<R> RCodec<Descriptor, &mut R> for Zenoh080
impl<R> RCodec<MetadataDescriptor, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<Descriptor, Self::Error> {
let id = self.read(&mut *reader)?;
let index_and_bitpos = self.read(&mut *reader)?;

Ok(Descriptor {
id,
index_and_bitpos,
})
}
}

impl<R> RCodec<HeaderDescriptor, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<HeaderDescriptor, Self::Error> {
fn read(self, reader: &mut R) -> Result<MetadataDescriptor, Self::Error> {
let id = self.read(&mut *reader)?;
let index = self.read(&mut *reader)?;

Ok(HeaderDescriptor { id, index })
Ok(MetadataDescriptor { id, index })
}
}

Expand Down Expand Up @@ -172,21 +135,11 @@ where
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<ShmBufInfo, Self::Error> {
let data_descriptor = self.read(&mut *reader)?;
let shm_protocol = self.read(&mut *reader)?;
let data_len = self.read(&mut *reader)?;
let watchdog_descriptor = self.read(&mut *reader)?;
let header_descriptor = self.read(&mut *reader)?;
let metadata = self.read(&mut *reader)?;
let generation = self.read(&mut *reader)?;

let shm_info = ShmBufInfo::new(
data_descriptor,
shm_protocol,
data_len,
watchdog_descriptor,
header_descriptor,
generation,
);
let shm_info = ShmBufInfo::new(data_len, metadata, generation);
Ok(shm_info)
}
}
13 changes: 2 additions & 11 deletions commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,22 +361,13 @@ fn codec_encoding() {
#[cfg(feature = "shared-memory")]
#[test]
fn codec_shm_info() {
use zenoh_shm::{
api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor,
watchdog::descriptor::Descriptor, ShmBufInfo,
};
use zenoh_shm::{metadata::descriptor::MetadataDescriptor, ShmBufInfo};

run!(ShmBufInfo, {
let mut rng = rand::thread_rng();
ShmBufInfo::new(
ChunkDescriptor::new(rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
rng.gen(),
Descriptor {
id: rng.gen(),
index_and_bitpos: rng.gen(),
},
HeaderDescriptor {
MetadataDescriptor {
id: rng.gen(),
index: rng.gen(),
},
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-shm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ static_init = { workspace = true }
num-traits = { workspace = true }
num_cpus = { workspace = true, optional = true }
thread-priority = { workspace = true }
lockfree = { workspace = true }
stabby = { workspace = true }
crossbeam-queue = { workspace = true }

[target.'cfg(unix)'.dependencies]
advisory-lock = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/api/provider/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::api::common::types::{ChunkID, SegmentID};
/// Uniquely identifies the particular chunk within particular segment
#[zenoh_macros::unstable_doc]
#[derive(Clone, Debug, PartialEq, Eq)]
#[stabby::stabby]
pub struct ChunkDescriptor {
pub segment: SegmentID,
pub chunk: ChunkID,
Expand Down
Loading

0 comments on commit 9ba08eb

Please sign in to comment.