diff --git a/Cargo.lock b/Cargo.lock index 6f7eaf1345..1bee717506 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1036,6 +1036,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -2040,15 +2049,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "lockfree" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74ee94b5ad113c7cb98c5a040f783d0952ee4fe100993881d1673c2cb002dd23" -dependencies = [ - "owned-alloc", -] - [[package]] name = "log" version = "0.4.22" @@ -2435,12 +2435,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" -[[package]] -name = "owned-alloc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30fceb411f9a12ff9222c5f824026be368ff15dc2f13468d850c7d3f502205d6" - [[package]] name = "parking" version = "2.2.1" @@ -5622,8 +5616,8 @@ dependencies = [ "advisory-lock", "async-trait", "crc", + "crossbeam-queue", "libc", - "lockfree", "num-traits", "num_cpus", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index bd25ee220f..75a54f8808 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,6 +93,7 @@ const_format = "0.2.33" crc = "3.2.1" criterion = "0.5" crossbeam-utils = "0.8.20" +crossbeam-queue = "0.3.12" derive_more = { version = "1.0.0", features = ["as_ref"] } derive-new = "0.7.0" tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } @@ -118,7 +119,6 @@ leb128 = "0.2" libc = "0.2.158" libloading = "0.8" tracing = "0.1" -lockfree = "0.5" lz4_flex = "0.11" nix = { version = "0.29.0", features = ["fs"] } num_cpus = "1.16.0" diff --git a/commons/zenoh-codec/src/core/shm.rs b/commons/zenoh-codec/src/core/shm.rs index b67716611d..0408053679 100644 --- a/commons/zenoh-codec/src/core/shm.rs +++ b/commons/zenoh-codec/src/core/shm.rs @@ -18,32 +18,18 @@ use zenoh_buffers::{ writer::{DidntWrite, Writer}, }; use zenoh_shm::{ - api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor, - watchdog::descriptor::Descriptor, ShmBufInfo, + api::provider::chunk::ChunkDescriptor, metadata::descriptor::MetadataDescriptor, ShmBufInfo, }; use crate::{RCodec, WCodec, Zenoh080}; -impl WCodec<&Descriptor, &mut W> for Zenoh080 +impl WCodec<&MetadataDescriptor, &mut W> for Zenoh080 where W: Writer, { type Output = Result<(), DidntWrite>; - fn write(self, writer: &mut W, x: &Descriptor) -> Self::Output { - self.write(&mut *writer, x.id)?; - self.write(&mut *writer, x.index_and_bitpos)?; - Ok(()) - } -} - -impl WCodec<&HeaderDescriptor, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &HeaderDescriptor) -> Self::Output { + fn write(self, writer: &mut W, x: &MetadataDescriptor) -> Self::Output { self.write(&mut *writer, x.id)?; self.write(&mut *writer, x.index)?; Ok(()) @@ -84,52 +70,29 @@ where fn write(self, writer: &mut W, x: &ShmBufInfo) -> Self::Output { let ShmBufInfo { - data_descriptor, - shm_protocol, data_len, - watchdog_descriptor, - header_descriptor, + metadata, generation, } = x; - self.write(&mut *writer, data_descriptor)?; - self.write(&mut *writer, shm_protocol)?; self.write(&mut *writer, *data_len)?; - self.write(&mut *writer, watchdog_descriptor)?; - self.write(&mut *writer, header_descriptor)?; + self.write(&mut *writer, metadata)?; self.write(&mut *writer, generation)?; Ok(()) } } -impl RCodec for Zenoh080 +impl RCodec for Zenoh080 where R: Reader, { type Error = DidntRead; - fn read(self, reader: &mut R) -> Result { - let id = self.read(&mut *reader)?; - let index_and_bitpos = self.read(&mut *reader)?; - - Ok(Descriptor { - id, - index_and_bitpos, - }) - } -} - -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { + fn read(self, reader: &mut R) -> Result { let id = self.read(&mut *reader)?; let index = self.read(&mut *reader)?; - Ok(HeaderDescriptor { id, index }) + Ok(MetadataDescriptor { id, index }) } } @@ -172,21 +135,11 @@ where type Error = DidntRead; fn read(self, reader: &mut R) -> Result { - let data_descriptor = self.read(&mut *reader)?; - let shm_protocol = self.read(&mut *reader)?; let data_len = self.read(&mut *reader)?; - let watchdog_descriptor = self.read(&mut *reader)?; - let header_descriptor = self.read(&mut *reader)?; + let metadata = self.read(&mut *reader)?; let generation = self.read(&mut *reader)?; - let shm_info = ShmBufInfo::new( - data_descriptor, - shm_protocol, - data_len, - watchdog_descriptor, - header_descriptor, - generation, - ); + let shm_info = ShmBufInfo::new(data_len, metadata, generation); Ok(shm_info) } } diff --git a/commons/zenoh-codec/tests/codec.rs b/commons/zenoh-codec/tests/codec.rs index 7af1ce64d1..c969047a7f 100644 --- a/commons/zenoh-codec/tests/codec.rs +++ b/commons/zenoh-codec/tests/codec.rs @@ -361,22 +361,13 @@ fn codec_encoding() { #[cfg(feature = "shared-memory")] #[test] fn codec_shm_info() { - use zenoh_shm::{ - api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor, - watchdog::descriptor::Descriptor, ShmBufInfo, - }; + use zenoh_shm::{metadata::descriptor::MetadataDescriptor, ShmBufInfo}; run!(ShmBufInfo, { let mut rng = rand::thread_rng(); ShmBufInfo::new( - ChunkDescriptor::new(rng.gen(), rng.gen(), rng.gen()), - rng.gen(), rng.gen(), - Descriptor { - id: rng.gen(), - index_and_bitpos: rng.gen(), - }, - HeaderDescriptor { + MetadataDescriptor { id: rng.gen(), index: rng.gen(), }, diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index 8dbcb7c6b7..9039f7cf0c 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -46,8 +46,8 @@ static_init = { workspace = true } num-traits = { workspace = true } num_cpus = { workspace = true, optional = true } thread-priority = { workspace = true } -lockfree = { workspace = true } stabby = { workspace = true } +crossbeam-queue = { workspace = true } [target.'cfg(unix)'.dependencies] advisory-lock = { workspace = true } diff --git a/commons/zenoh-shm/src/api/provider/chunk.rs b/commons/zenoh-shm/src/api/provider/chunk.rs index fe7d0d5cb6..6d8ea8e884 100644 --- a/commons/zenoh-shm/src/api/provider/chunk.rs +++ b/commons/zenoh-shm/src/api/provider/chunk.rs @@ -19,6 +19,7 @@ use crate::api::common::types::{ChunkID, SegmentID}; /// Uniquely identifies the particular chunk within particular segment #[zenoh_macros::unstable_doc] #[derive(Clone, Debug, PartialEq, Eq)] +#[stabby::stabby] pub struct ChunkDescriptor { pub segment: SegmentID, pub chunk: ChunkID, diff --git a/commons/zenoh-shm/src/api/provider/shm_provider.rs b/commons/zenoh-shm/src/api/provider/shm_provider.rs index c517529df0..d84777378d 100644 --- a/commons/zenoh-shm/src/api/provider/shm_provider.rs +++ b/commons/zenoh-shm/src/api/provider/shm_provider.rs @@ -36,15 +36,12 @@ use super::{ }; use crate::{ api::{buffer::zshmmut::ZShmMut, common::types::ProtocolID}, - header::{ - allocated_descriptor::AllocatedHeaderDescriptor, descriptor::HeaderDescriptor, - storage::GLOBAL_HEADER_STORAGE, + metadata::{ + allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor, + storage::GLOBAL_METADATA_STORAGE, }, watchdog::{ - allocated_watchdog::AllocatedWatchdog, confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR}, - descriptor::Descriptor, - storage::GLOBAL_STORAGE, validator::GLOBAL_VALIDATOR, }, ShmBufInfo, ShmBufInner, @@ -52,22 +49,16 @@ use crate::{ #[derive(Debug)] struct BusyChunk { - descriptor: ChunkDescriptor, - header: AllocatedHeaderDescriptor, - _watchdog: AllocatedWatchdog, + metadata: AllocatedMetadataDescriptor, } impl BusyChunk { - fn new( - descriptor: ChunkDescriptor, - header: AllocatedHeaderDescriptor, - watchdog: AllocatedWatchdog, - ) -> Self { - Self { - descriptor, - header, - _watchdog: watchdog, - } + fn new(metadata: AllocatedMetadataDescriptor) -> Self { + Self { metadata } + } + + fn descriptor(&self) -> ChunkDescriptor { + self.metadata.header().data_descriptor() } } @@ -228,7 +219,7 @@ impl ForceDeallocPolicy for DeallocOptimal { }; drop(guard); - provider.backend.free(&chunk_to_dealloc.descriptor); + provider.backend.free(&chunk_to_dealloc.descriptor()); true } } @@ -242,7 +233,7 @@ impl ForceDeallocPolicy for DeallocYoungest { ) -> bool { match provider.busy_list.lock().unwrap().pop_back() { Some(val) => { - provider.backend.free(&val.descriptor); + provider.backend.free(&val.descriptor()); true } None => false, @@ -259,7 +250,7 @@ impl ForceDeallocPolicy for DeallocEldest { ) -> bool { match provider.busy_list.lock().unwrap().pop_front() { Some(val) => { - provider.backend.free(&val.descriptor); + provider.backend.free(&val.descriptor()); true } None => false, @@ -822,16 +813,10 @@ where let len = len.try_into()?; // allocate resources for SHM buffer - let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?; + let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?; // wrap everything to ShmBufInner - let wrapped = self.wrap( - chunk, - len, - allocated_header, - allocated_watchdog, - confirmed_watchdog, - ); + let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata); Ok(unsafe { ZShmMut::new_unchecked(wrapped) }) } @@ -840,7 +825,7 @@ where #[zenoh_macros::unstable_doc] pub fn garbage_collect(&self) -> usize { fn is_free_chunk(chunk: &BusyChunk) -> bool { - let header = chunk.header.descriptor.header(); + let header = chunk.metadata.header(); if header.refcount.load(Ordering::SeqCst) != 0 { return header.watchdog_invalidated.load(Ordering::SeqCst); } @@ -854,8 +839,9 @@ where guard.retain(|maybe_free| { if is_free_chunk(maybe_free) { tracing::trace!("Garbage Collecting Chunk: {:?}", maybe_free); - self.backend.free(&maybe_free.descriptor); - largest = largest.max(maybe_free.descriptor.len.get()); + let descriptor_to_free = maybe_free.descriptor(); + self.backend.free(&descriptor_to_free); + largest = largest.max(descriptor_to_free.len.get()); return false; } true @@ -891,7 +877,7 @@ where Policy: AllocPolicy, { // allocate resources for SHM buffer - let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?; + let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?; // allocate data chunk // Perform actions depending on the Policy @@ -902,82 +888,65 @@ where let chunk = Policy::alloc(layout, self)?; // wrap allocated chunk to ShmBufInner - let wrapped = self.wrap( - chunk, - size, - allocated_header, - allocated_watchdog, - confirmed_watchdog, - ); + let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata); Ok(unsafe { ZShmMut::new_unchecked(wrapped) }) } - fn alloc_resources() -> ZResult<( - AllocatedHeaderDescriptor, - AllocatedWatchdog, - ConfirmedDescriptor, - )> { - // allocate shared header - let allocated_header = GLOBAL_HEADER_STORAGE.read().allocate_header()?; - - // allocate watchdog - let allocated_watchdog = GLOBAL_STORAGE.read().allocate_watchdog()?; + fn alloc_resources() -> ZResult<(AllocatedMetadataDescriptor, ConfirmedDescriptor)> { + // allocate metadata + let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?; // add watchdog to confirmator - let confirmed_watchdog = GLOBAL_CONFIRMATOR - .read() - .add_owned(&allocated_watchdog.descriptor)?; + let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone()); - Ok((allocated_header, allocated_watchdog, confirmed_watchdog)) + Ok((allocated_metadata, confirmed_metadata)) } fn wrap( &self, chunk: AllocatedChunk, len: NonZeroUsize, - allocated_header: AllocatedHeaderDescriptor, - allocated_watchdog: AllocatedWatchdog, - confirmed_watchdog: ConfirmedDescriptor, + allocated_metadata: AllocatedMetadataDescriptor, + confirmed_metadata: ConfirmedDescriptor, ) -> ShmBufInner { - let header = allocated_header.descriptor.clone(); - let descriptor = Descriptor::from(&allocated_watchdog.descriptor); + // write additional metadata + // chunk descriptor + allocated_metadata + .header() + .set_data_descriptor(&chunk.descriptor); + // protocol + allocated_metadata + .header() + .protocol + .store(self.id.id(), Ordering::Relaxed); // add watchdog to validator - let c_header = header.clone(); - GLOBAL_VALIDATOR.read().add( - allocated_watchdog.descriptor.clone(), - Box::new(move || { - c_header - .header() - .watchdog_invalidated - .store(true, Ordering::SeqCst); - }), - ); + GLOBAL_VALIDATOR + .read() + .add(confirmed_metadata.owned.clone()); // Create buffer's info let info = ShmBufInfo::new( - chunk.descriptor.clone(), - self.id.id(), len, - descriptor, - HeaderDescriptor::from(&header), - header.header().generation.load(Ordering::SeqCst), + MetadataDescriptor::from(&confirmed_metadata.owned), + allocated_metadata + .header() + .generation + .load(Ordering::SeqCst), ); // Create buffer let shmb = ShmBufInner { - header, + metadata: Arc::new(confirmed_metadata), buf: chunk.data, info, - watchdog: Arc::new(confirmed_watchdog), }; // Create and store busy chunk - self.busy_list.lock().unwrap().push_back(BusyChunk::new( - chunk.descriptor, - allocated_header, - allocated_watchdog, - )); + self.busy_list + .lock() + .unwrap() + .push_back(BusyChunk::new(allocated_metadata)); shmb } @@ -998,7 +967,7 @@ where Policy: AsyncAllocPolicy, { // allocate resources for SHM buffer - let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?; + let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?; // allocate data chunk // Perform actions depending on the Policy @@ -1009,13 +978,7 @@ where let chunk = Policy::alloc_async(backend_layout, self).await?; // wrap allocated chunk to ShmBufInner - let wrapped = self.wrap( - chunk, - size, - allocated_header, - allocated_watchdog, - confirmed_watchdog, - ); + let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata); Ok(unsafe { ZShmMut::new_unchecked(wrapped) }) } } diff --git a/commons/zenoh-shm/src/cleanup.rs b/commons/zenoh-shm/src/cleanup.rs index a5c3aacc4f..d8d9b551c4 100644 --- a/commons/zenoh-shm/src/cleanup.rs +++ b/commons/zenoh-shm/src/cleanup.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // +use crossbeam_queue::SegQueue; use static_init::dynamic; use crate::posix_shm::cleanup::cleanup_orphaned_segments; @@ -23,7 +24,7 @@ pub(crate) static mut CLEANUP: Cleanup = Cleanup::new(); /// An RAII object that calls all registered routines upon destruction pub(crate) struct Cleanup { - cleanups: lockfree::queue::Queue>>, + cleanups: SegQueue>>, } impl Cleanup { diff --git a/commons/zenoh-shm/src/header/chunk_header.rs b/commons/zenoh-shm/src/header/chunk_header.rs index c5eb11bb7c..f5d0ca1c78 100644 --- a/commons/zenoh-shm/src/header/chunk_header.rs +++ b/commons/zenoh-shm/src/header/chunk_header.rs @@ -12,7 +12,12 @@ // ZettaScale Zenoh Team, // -use std::sync::atomic::{AtomicBool, AtomicU32}; +use std::{ + num::NonZeroUsize, + sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}, +}; + +use crate::api::provider::chunk::ChunkDescriptor; // Chunk header #[stabby::stabby] @@ -25,4 +30,29 @@ pub struct ChunkHeaderType { pub refcount: AtomicU32, pub watchdog_invalidated: AtomicBool, pub generation: AtomicU32, + + /// Protocol identifier for particular SHM implementation + pub protocol: AtomicU32, + + /// The data chunk descriptor + segment: AtomicU32, + chunk: AtomicU32, + len: AtomicUsize, +} + +impl ChunkHeaderType { + pub fn set_data_descriptor(&self, descriptor: &ChunkDescriptor) { + self.segment.store(descriptor.segment, Ordering::Relaxed); + self.chunk.store(descriptor.chunk, Ordering::Relaxed); + self.len.store(descriptor.len.into(), Ordering::Relaxed); + } + + pub fn data_descriptor(&self) -> ChunkDescriptor { + ChunkDescriptor::new( + self.segment.load(Ordering::Relaxed), + self.chunk.load(Ordering::Relaxed), + // SAFETY: this is safe because Write access to self.len is available only from set_data_descriptor + unsafe { NonZeroUsize::new_unchecked(self.len.load(Ordering::Relaxed)) }, + ) + } } diff --git a/commons/zenoh-shm/src/header/descriptor.rs b/commons/zenoh-shm/src/header/descriptor.rs deleted file mode 100644 index 7700eb90c6..0000000000 --- a/commons/zenoh-shm/src/header/descriptor.rs +++ /dev/null @@ -1,63 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use std::sync::Arc; - -use super::{chunk_header::ChunkHeaderType, segment::HeaderSegment}; - -pub type HeaderSegmentID = u16; -pub type HeaderIndex = u16; - -#[derive(Clone, Eq, Hash, PartialEq, PartialOrd, Ord, Debug)] -pub struct HeaderDescriptor { - pub id: HeaderSegmentID, - pub index: HeaderIndex, -} - -impl From<&OwnedHeaderDescriptor> for HeaderDescriptor { - fn from(item: &OwnedHeaderDescriptor) -> Self { - let id = item.segment.array.id(); - let index = unsafe { item.segment.array.index(item.header) }; - - Self { id, index } - } -} - -#[derive(Clone)] -pub struct OwnedHeaderDescriptor { - segment: Arc, - header: *const ChunkHeaderType, -} - -unsafe impl Send for OwnedHeaderDescriptor {} -unsafe impl Sync for OwnedHeaderDescriptor {} - -impl OwnedHeaderDescriptor { - pub(crate) fn new(segment: Arc, header: *const ChunkHeaderType) -> Self { - Self { segment, header } - } - - #[inline(always)] - pub fn header(&self) -> &ChunkHeaderType { - unsafe { &(*self.header) } - } -} - -impl std::fmt::Debug for OwnedHeaderDescriptor { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("OwnedHeaderDescriptor") - .field("header", &self.header) - .finish() - } -} diff --git a/commons/zenoh-shm/src/header/mod.rs b/commons/zenoh-shm/src/header/mod.rs index 84acc86e87..9df661b74a 100644 --- a/commons/zenoh-shm/src/header/mod.rs +++ b/commons/zenoh-shm/src/header/mod.rs @@ -12,12 +12,4 @@ // ZettaScale Zenoh Team, // -pub mod descriptor; - -tested_crate_module!(storage); -tested_crate_module!(subscription); - -pub(crate) mod allocated_descriptor; pub(crate) mod chunk_header; - -mod segment; diff --git a/commons/zenoh-shm/src/header/segment.rs b/commons/zenoh-shm/src/header/segment.rs deleted file mode 100644 index ab2353c35d..0000000000 --- a/commons/zenoh-shm/src/header/segment.rs +++ /dev/null @@ -1,39 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use zenoh_result::ZResult; - -use super::{ - chunk_header::ChunkHeaderType, - descriptor::{HeaderIndex, HeaderSegmentID}, -}; -use crate::posix_shm::array::ArrayInSHM; - -const HEADER_SEGMENT_PREFIX: &str = "header"; - -pub struct HeaderSegment { - pub array: ArrayInSHM, -} - -impl HeaderSegment { - pub fn create(header_count: usize) -> ZResult { - let array = ArrayInSHM::create(header_count, HEADER_SEGMENT_PREFIX)?; - Ok(Self { array }) - } - - pub fn open(id: HeaderSegmentID) -> ZResult { - let array = ArrayInSHM::open(id, HEADER_SEGMENT_PREFIX)?; - Ok(Self { array }) - } -} diff --git a/commons/zenoh-shm/src/header/storage.rs b/commons/zenoh-shm/src/header/storage.rs deleted file mode 100644 index db556937d0..0000000000 --- a/commons/zenoh-shm/src/header/storage.rs +++ /dev/null @@ -1,86 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::{ - collections::LinkedList, - sync::{Arc, Mutex}, -}; - -use static_init::dynamic; -use zenoh_result::{zerror, ZResult}; - -use super::{ - allocated_descriptor::AllocatedHeaderDescriptor, - descriptor::{HeaderIndex, OwnedHeaderDescriptor}, - segment::HeaderSegment, -}; - -#[dynamic(lazy, drop)] -pub static mut GLOBAL_HEADER_STORAGE: HeaderStorage = HeaderStorage::new(32768usize).unwrap(); - -pub struct HeaderStorage { - available: Arc>>, -} - -impl HeaderStorage { - fn new(initial_header_count: usize) -> ZResult { - let initial_segment = Arc::new(HeaderSegment::create(initial_header_count)?); - let mut initially_available = LinkedList::::default(); - - for index in 0..initial_header_count { - let header = unsafe { initial_segment.array.elem(index as HeaderIndex) }; - let descriptor = OwnedHeaderDescriptor::new(initial_segment.clone(), header); - - // init generation (this is not really necessary, but we do) - descriptor - .header() - .generation - .store(0, std::sync::atomic::Ordering::SeqCst); - - initially_available.push_back(descriptor); - } - - Ok(Self { - available: Arc::new(Mutex::new(initially_available)), - }) - } - - pub fn allocate_header(&self) -> ZResult { - let mut guard = self.available.lock().map_err(|e| zerror!("{e}"))?; - let popped = guard.pop_front(); - drop(guard); - - let descriptor = popped.ok_or_else(|| zerror!("no free headers available"))?; - - //initialize header fields - let header = descriptor.header(); - header - .refcount - .store(1, std::sync::atomic::Ordering::SeqCst); - header - .watchdog_invalidated - .store(false, std::sync::atomic::Ordering::SeqCst); - - Ok(AllocatedHeaderDescriptor { descriptor }) - } - - pub fn reclaim_header(&self, header: OwnedHeaderDescriptor) { - // header deallocated - increment it's generation to invalidate any existing references - header - .header() - .generation - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let mut guard = self.available.lock().unwrap(); - guard.push_front(header); - } -} diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index 36f2068db9..37cdf75df6 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -26,9 +26,9 @@ use std::{ }, }; -use api::{common::types::ProtocolID, provider::chunk::ChunkDescriptor}; -use header::descriptor::{HeaderDescriptor, OwnedHeaderDescriptor}; -use watchdog::{confirmator::ConfirmedDescriptor, descriptor::Descriptor}; +use api::common::types::ProtocolID; +use metadata::descriptor::MetadataDescriptor; +use watchdog::confirmator::ConfirmedDescriptor; use zenoh_buffers::ZSliceBuffer; #[macro_export] @@ -54,8 +54,10 @@ macro_rules! tested_crate_module { pub mod api; mod cleanup; pub mod header; +pub mod metadata; pub mod posix_shm; pub mod reader; +pub mod version; pub mod watchdog; /// Information about a [`ShmBufInner`]. @@ -63,50 +65,36 @@ pub mod watchdog; /// This that can be serialized and can be used to retrieve the [`ShmBufInner`] in a remote process. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ShmBufInfo { - /// The data chunk descriptor - pub data_descriptor: ChunkDescriptor, - /// Protocol identifier for particular SHM implementation - pub shm_protocol: ProtocolID, /// Actual data length /// NOTE: data_descriptor's len is >= of this len and describes the actual memory length /// dedicated in shared memory segment for this particular buffer. pub data_len: NonZeroUsize, - /// The watchdog descriptor - pub watchdog_descriptor: Descriptor, - /// The header descriptor - pub header_descriptor: HeaderDescriptor, - /// The generation of the buffer + /// Metadata descriptor + pub metadata: MetadataDescriptor, + /// Generation of the buffer pub generation: u32, } impl ShmBufInfo { pub fn new( - data_descriptor: ChunkDescriptor, - shm_protocol: ProtocolID, data_len: NonZeroUsize, - watchdog_descriptor: Descriptor, - header_descriptor: HeaderDescriptor, + metadata: MetadataDescriptor, generation: u32, ) -> ShmBufInfo { ShmBufInfo { - data_descriptor, - shm_protocol, data_len, - watchdog_descriptor, - header_descriptor, + metadata, generation, } } } /// A zenoh buffer in shared memory. -#[non_exhaustive] pub struct ShmBufInner { - pub(crate) header: OwnedHeaderDescriptor, + pub(crate) metadata: Arc, pub(crate) buf: AtomicPtr, pub info: ShmBufInfo, - pub(crate) watchdog: Arc, } impl PartialEq for ShmBufInner { @@ -122,7 +110,7 @@ impl Eq for ShmBufInner {} impl std::fmt::Debug for ShmBufInner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ShmBufInner") - .field("header", &self.header) + .field("metadata", &self.metadata) .field("buf", &self.buf) .field("info", &self.info) .finish() @@ -130,12 +118,23 @@ impl std::fmt::Debug for ShmBufInner { } impl ShmBufInner { + pub fn protocol(&self) -> ProtocolID { + self.metadata + .owned + .header() + .protocol + .load(Ordering::Relaxed) + } + pub fn len(&self) -> NonZeroUsize { self.info.data_len } fn is_valid(&self) -> bool { - self.header.header().generation.load(Ordering::SeqCst) == self.info.generation + let header = self.metadata.owned.header(); + + !header.watchdog_invalidated.load(Ordering::SeqCst) + && header.generation.load(Ordering::SeqCst) == self.info.generation } fn is_unique(&self) -> bool { @@ -143,7 +142,7 @@ impl ShmBufInner { } pub fn ref_count(&self) -> u32 { - self.header.header().refcount.load(Ordering::SeqCst) + self.metadata.owned.header().refcount.load(Ordering::SeqCst) } /// Increments buffer's reference count @@ -153,7 +152,11 @@ impl ShmBufInner { /// of the reference counter can lead to memory being stalled until /// recovered by watchdog subsystem or forcibly deallocated pub unsafe fn inc_ref_count(&self) { - self.header.header().refcount.fetch_add(1, Ordering::SeqCst); + self.metadata + .owned + .header() + .refcount + .fetch_add(1, Ordering::SeqCst); } // PRIVATE: @@ -164,7 +167,11 @@ impl ShmBufInner { } unsafe fn dec_ref_count(&self) { - self.header.header().refcount.fetch_sub(1, Ordering::SeqCst); + self.metadata + .owned + .header() + .refcount + .fetch_sub(1, Ordering::SeqCst); } /// Gets a mutable slice. @@ -197,10 +204,9 @@ impl Clone for ShmBufInner { unsafe { self.inc_ref_count() }; let bp = self.buf.load(Ordering::SeqCst); ShmBufInner { - header: self.header.clone(), + metadata: self.metadata.clone(), buf: AtomicPtr::new(bp), info: self.info.clone(), - watchdog: self.watchdog.clone(), } } } diff --git a/commons/zenoh-shm/src/metadata/allocated_descriptor.rs b/commons/zenoh-shm/src/metadata/allocated_descriptor.rs new file mode 100644 index 0000000000..c9355d00fd --- /dev/null +++ b/commons/zenoh-shm/src/metadata/allocated_descriptor.rs @@ -0,0 +1,58 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::ops::Deref; + +use super::{descriptor::OwnedMetadataDescriptor, storage::GLOBAL_METADATA_STORAGE}; +use crate::watchdog::validator::GLOBAL_VALIDATOR; + +#[derive(Debug)] +pub struct AllocatedMetadataDescriptor { + descriptor: OwnedMetadataDescriptor, +} + +impl AllocatedMetadataDescriptor { + pub fn new(descriptor: OwnedMetadataDescriptor) -> Self { + //initialize header fields + let header = descriptor.header(); + header + .refcount + .store(1, std::sync::atomic::Ordering::SeqCst); + header + .watchdog_invalidated + .store(false, std::sync::atomic::Ordering::SeqCst); + + // reset watchdog on allocation + descriptor.validate(); + + Self { descriptor } + } +} + +impl Drop for AllocatedMetadataDescriptor { + fn drop(&mut self) { + GLOBAL_VALIDATOR.read().remove(self.descriptor.clone()); + GLOBAL_METADATA_STORAGE + .read() + .reclaim(self.descriptor.clone()); + } +} + +impl Deref for AllocatedMetadataDescriptor { + type Target = OwnedMetadataDescriptor; + + fn deref(&self) -> &Self::Target { + &self.descriptor + } +} diff --git a/commons/zenoh-shm/src/metadata/descriptor.rs b/commons/zenoh-shm/src/metadata/descriptor.rs new file mode 100644 index 0000000000..9a115f41e5 --- /dev/null +++ b/commons/zenoh-shm/src/metadata/descriptor.rs @@ -0,0 +1,117 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::sync::{atomic::AtomicU64, Arc}; + +use super::segment::MetadataSegment; +use crate::header::chunk_header::ChunkHeaderType; + +pub type MetadataSegmentID = u16; +pub type MetadataIndex = u16; + +#[derive(Clone, Eq, Hash, PartialEq, PartialOrd, Ord, Debug)] +pub struct MetadataDescriptor { + pub id: MetadataSegmentID, + pub index: MetadataIndex, +} + +impl From<&OwnedMetadataDescriptor> for MetadataDescriptor { + fn from(item: &OwnedMetadataDescriptor) -> Self { + let id = item.segment.data.id(); + let index = unsafe { item.segment.data.fast_index_compute(item.header) }; + + Self { id, index } + } +} + +#[derive(Clone)] +pub struct OwnedMetadataDescriptor { + pub(crate) segment: Arc, + header: &'static ChunkHeaderType, + watchdog_atomic: &'static AtomicU64, + watchdog_mask: u64, +} + +impl OwnedMetadataDescriptor { + pub(crate) fn new( + segment: Arc, + header: &'static ChunkHeaderType, + watchdog_atomic: &'static AtomicU64, + watchdog_mask: u64, + ) -> Self { + Self { + segment, + header, + watchdog_atomic, + watchdog_mask, + } + } + + #[inline(always)] + pub fn header(&self) -> &ChunkHeaderType { + self.header + } + + pub fn confirm(&self) { + self.watchdog_atomic + .fetch_or(self.watchdog_mask, std::sync::atomic::Ordering::SeqCst); + } + + pub(crate) fn validate(&self) -> u64 { + self.watchdog_atomic + .fetch_and(!self.watchdog_mask, std::sync::atomic::Ordering::SeqCst) + & self.watchdog_mask + } + + #[cfg(feature = "test")] + pub fn test_validate(&self) -> u64 { + self.validate() + } +} + +// The ordering strategy is important. See storage implementation for details +impl Ord for OwnedMetadataDescriptor { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + match self + .watchdog_atomic + .as_ptr() + .cmp(&other.watchdog_atomic.as_ptr()) + { + core::cmp::Ordering::Equal => self.watchdog_mask.cmp(&other.watchdog_mask), + ord => ord, + } + } +} + +impl PartialOrd for OwnedMetadataDescriptor { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for OwnedMetadataDescriptor { + fn eq(&self, other: &Self) -> bool { + self.watchdog_atomic.as_ptr() == other.watchdog_atomic.as_ptr() + && self.watchdog_mask == other.watchdog_mask + } +} +impl Eq for OwnedMetadataDescriptor {} + +impl std::fmt::Debug for OwnedMetadataDescriptor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OwnedHeaderDescriptor") + .field("header", &self.header) + .finish() + } +} diff --git a/commons/zenoh-shm/src/header/allocated_descriptor.rs b/commons/zenoh-shm/src/metadata/mod.rs similarity index 51% rename from commons/zenoh-shm/src/header/allocated_descriptor.rs rename to commons/zenoh-shm/src/metadata/mod.rs index 6cf1d1d011..a10c3c9c8f 100644 --- a/commons/zenoh-shm/src/header/allocated_descriptor.rs +++ b/commons/zenoh-shm/src/metadata/mod.rs @@ -1,5 +1,5 @@ // -// Copyright (c) 2023 ZettaScale Technology +// Copyright (c) 2025 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at @@ -12,17 +12,11 @@ // ZettaScale Zenoh Team, // -use super::{descriptor::OwnedHeaderDescriptor, storage::GLOBAL_HEADER_STORAGE}; +pub mod descriptor; -#[derive(Debug)] -pub struct AllocatedHeaderDescriptor { - pub descriptor: OwnedHeaderDescriptor, -} +tested_crate_module!(storage); +tested_crate_module!(subscription); -impl Drop for AllocatedHeaderDescriptor { - fn drop(&mut self) { - GLOBAL_HEADER_STORAGE - .read() - .reclaim_header(self.descriptor.clone()); - } -} +pub(crate) mod allocated_descriptor; + +mod segment; diff --git a/commons/zenoh-shm/src/metadata/segment.rs b/commons/zenoh-shm/src/metadata/segment.rs new file mode 100644 index 0000000000..a138a7a634 --- /dev/null +++ b/commons/zenoh-shm/src/metadata/segment.rs @@ -0,0 +1,70 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::sync::atomic::AtomicU64; + +use zenoh_result::ZResult; + +use super::descriptor::{MetadataIndex, MetadataSegmentID}; +use crate::{header::chunk_header::ChunkHeaderType, posix_shm::struct_in_shm::StructInSHM}; + +const METADATA_SEGMENT_PREFIX: &str = "metadata"; + +#[stabby::stabby] +pub struct Metadata { + headers: [ChunkHeaderType; S], + watchdogs: [AtomicU64; S], // todo: replace with (S + 63) / 64 when Rust supports it +} + +impl Metadata { + // SAFETY: this is safe if header belongs to current Metadata instance + pub unsafe fn fast_index_compute(&self, header: *const ChunkHeaderType) -> MetadataIndex { + header.offset_from(self.headers.as_ptr()) as MetadataIndex + } + + // SAFETY: this is safe if index is in bounds! + pub unsafe fn fast_elem_compute( + &self, + index: MetadataIndex, + ) -> (&'static ChunkHeaderType, &'static AtomicU64, u64) { + let watchdog_index = index / 64; + let watchdog_mask_index = index % 64; + ( + &*(self.headers.as_ptr().offset(index as isize)), + &*(self.watchdogs.as_ptr().offset(watchdog_index as isize)), + 1u64 << watchdog_mask_index, + ) + } + + #[inline(always)] + pub const fn count(&self) -> usize { + S + } +} + +pub struct MetadataSegment { + pub data: StructInSHM>, +} + +impl MetadataSegment { + pub fn create() -> ZResult { + let data = StructInSHM::create(METADATA_SEGMENT_PREFIX)?; + Ok(Self { data }) + } + + pub fn open(id: MetadataSegmentID) -> ZResult { + let data = StructInSHM::open(id, METADATA_SEGMENT_PREFIX)?; + Ok(Self { data }) + } +} diff --git a/commons/zenoh-shm/src/metadata/storage.rs b/commons/zenoh-shm/src/metadata/storage.rs new file mode 100644 index 0000000000..cae0dfe1cc --- /dev/null +++ b/commons/zenoh-shm/src/metadata/storage.rs @@ -0,0 +1,87 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + collections::BTreeSet, + sync::{Arc, Mutex}, +}; + +use static_init::dynamic; +use zenoh_core::zlock; +use zenoh_result::{zerror, ZResult}; + +use super::{ + allocated_descriptor::AllocatedMetadataDescriptor, + descriptor::{MetadataIndex, OwnedMetadataDescriptor}, + segment::MetadataSegment, +}; + +#[dynamic(lazy, drop)] +pub static mut GLOBAL_METADATA_STORAGE: MetadataStorage = MetadataStorage::new().unwrap(); + +pub struct MetadataStorage { + available: Arc>>, +} + +impl MetadataStorage { + fn new() -> ZResult { + let initial_segment = Arc::new(MetadataSegment::create()?); + // See ordering implementation for OwnedMetadataDescriptor + #[allow(clippy::mutable_key_type)] + let mut initially_available = BTreeSet::::default(); + + for index in 0..initial_segment.data.count() { + let (header, watchdog, mask) = unsafe { + initial_segment + .data + .fast_elem_compute(index as MetadataIndex) + }; + let descriptor = + OwnedMetadataDescriptor::new(initial_segment.clone(), header, watchdog, mask); + + // init generation (this is not really necessary, but we do) + descriptor + .header() + .generation + .store(0, std::sync::atomic::Ordering::SeqCst); + + initially_available.insert(descriptor); + } + + Ok(Self { + available: Arc::new(Mutex::new(initially_available)), + }) + } + + pub fn allocate(&self) -> ZResult { + let mut guard = zlock!(self.available); + let popped = guard.pop_first(); + drop(guard); + + let descriptor = popped.ok_or_else(|| zerror!("no free headers available"))?; + + Ok(AllocatedMetadataDescriptor::new(descriptor)) + } + + pub fn reclaim(&self, descriptor: OwnedMetadataDescriptor) { + // header deallocated - increment it's generation to invalidate any existing references + descriptor + .header() + .generation + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let mut guard = self.available.lock().unwrap(); + let _new_insert = guard.insert(descriptor); + #[cfg(feature = "test")] + assert!(_new_insert); + } +} diff --git a/commons/zenoh-shm/src/header/subscription.rs b/commons/zenoh-shm/src/metadata/subscription.rs similarity index 60% rename from commons/zenoh-shm/src/header/subscription.rs rename to commons/zenoh-shm/src/metadata/subscription.rs index 6f92960aaa..f7e83aa9f2 100644 --- a/commons/zenoh-shm/src/header/subscription.rs +++ b/commons/zenoh-shm/src/metadata/subscription.rs @@ -1,5 +1,5 @@ // -// Copyright (c) 2023 ZettaScale Technology +// Copyright (c) 2025 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at @@ -20,15 +20,15 @@ use static_init::dynamic; use zenoh_result::{zerror, ZResult}; use super::{ - descriptor::{HeaderDescriptor, HeaderSegmentID, OwnedHeaderDescriptor}, - segment::HeaderSegment, + descriptor::{MetadataDescriptor, MetadataSegmentID, OwnedMetadataDescriptor}, + segment::MetadataSegment, }; #[dynamic(lazy, drop)] -pub static mut GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new(); +pub static mut GLOBAL_METADATA_SUBSCRIPTION: Subscription = Subscription::new(); pub struct Subscription { - linked_table: Mutex>>, + linked_table: Mutex>>, } impl Subscription { @@ -38,12 +38,12 @@ impl Subscription { } } - pub fn link(&self, descriptor: &HeaderDescriptor) -> ZResult { + pub fn link(&self, descriptor: &MetadataDescriptor) -> ZResult { let mut guard = self.linked_table.lock().map_err(|e| zerror!("{e}"))?; // ensure segment let segment = match guard.entry(descriptor.id) { std::collections::btree_map::Entry::Vacant(vacant) => { - let segment = Arc::new(HeaderSegment::open(descriptor.id)?); + let segment = Arc::new(MetadataSegment::open(descriptor.id)?); vacant.insert(segment.clone()); segment } @@ -52,9 +52,11 @@ impl Subscription { drop(guard); // construct owned descriptor - // SAFETY: HeaderDescriptor source guarantees that descriptor.index is valid for segment - let header = unsafe { segment.array.elem(descriptor.index) }; - let owned_descriptor = OwnedHeaderDescriptor::new(segment, header); + // SAFETY: MetadataDescriptor source guarantees that descriptor.index is valid for segment + let (header, watchdog, watchdog_mask) = + unsafe { segment.data.fast_elem_compute(descriptor.index) }; + let owned_descriptor = + OwnedMetadataDescriptor::new(segment, header, watchdog, watchdog_mask); Ok(owned_descriptor) } } diff --git a/commons/zenoh-shm/src/posix_shm/mod.rs b/commons/zenoh-shm/src/posix_shm/mod.rs index c9d3af32e0..bc553d008c 100644 --- a/commons/zenoh-shm/src/posix_shm/mod.rs +++ b/commons/zenoh-shm/src/posix_shm/mod.rs @@ -15,5 +15,6 @@ pub mod array; #[cfg(target_os = "linux")] pub(crate) mod segment_lock; +pub mod struct_in_shm; tested_crate_module!(segment); pub(crate) mod cleanup; diff --git a/commons/zenoh-shm/src/posix_shm/struct_in_shm.rs b/commons/zenoh-shm/src/posix_shm/struct_in_shm.rs new file mode 100644 index 0000000000..009bac8e0e --- /dev/null +++ b/commons/zenoh-shm/src/posix_shm/struct_in_shm.rs @@ -0,0 +1,111 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + fmt::Display, + marker::PhantomData, + mem::size_of, + ops::{Deref, DerefMut}, +}; + +// use stabby::IStable; +use zenoh_result::ZResult; + +use super::segment::Segment; + +/// An SHM segment that contains data structure +#[derive(Debug)] +pub struct StructInSHM +where + rand::distributions::Standard: rand::distributions::Distribution, + ID: Clone + Display, +{ + inner: Segment, + _phantom: PhantomData, +} + +unsafe impl Sync for StructInSHM +where + rand::distributions::Standard: rand::distributions::Distribution, + ID: Clone + Display, +{ +} +unsafe impl Send for StructInSHM +where + rand::distributions::Standard: rand::distributions::Distribution, + ID: Clone + Display, +{ +} + +impl StructInSHM +where + rand::distributions::Standard: rand::distributions::Distribution, + // Elem: IStable, // todo: stabby does not support IStable for big arrays + ID: Clone + Display, +{ + // Perform compile time check that Elem is not a ZST + const _S: () = if size_of::() == 0 { + panic!("Elem is a ZST. ZSTs are not allowed"); + }; + + pub fn create(file_prefix: &str) -> ZResult { + let alloc_size = size_of::(); + let inner = Segment::create(alloc_size, file_prefix)?; + Ok(Self { + inner, + _phantom: PhantomData, + }) + } + + pub fn open(id: ID, file_prefix: &str) -> ZResult { + let inner = Segment::open(id, file_prefix)?; + Ok(Self { + inner, + _phantom: PhantomData, + }) + } + + pub fn id(&self) -> ID { + self.inner.id() + } + + /// Retrieves mut element + pub fn elem_mut(&self) -> *mut Elem { + self.inner.as_ptr() as *mut Elem + } +} + +impl Deref for StructInSHM +where + rand::distributions::Standard: rand::distributions::Distribution, + // Elem: IStable, // todo: stabby does not support IStable for big arrays + ID: Clone + Display, +{ + type Target = Elem; + + fn deref(&self) -> &Self::Target { + unsafe { &*(self.inner.as_ptr() as *const Elem) } + } +} + +impl DerefMut for StructInSHM +where + rand::distributions::Standard: rand::distributions::Distribution, + // Elem: IStable, // todo: stabby does not support IStable for big arrays + ID: Clone + Display, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *(self.inner.as_ptr() as *mut Elem) } + } +} diff --git a/commons/zenoh-shm/src/reader.rs b/commons/zenoh-shm/src/reader.rs index a62e8a147f..f65a12fc36 100644 --- a/commons/zenoh-shm/src/reader.rs +++ b/commons/zenoh-shm/src/reader.rs @@ -22,8 +22,9 @@ use crate::{ client::shm_segment::ShmSegment, client_storage::ShmClientStorage, common::types::{ProtocolID, SegmentID}, + provider::chunk::ChunkDescriptor, }, - header::subscription::GLOBAL_HEADER_SUBSCRIPTION, + metadata::subscription::GLOBAL_METADATA_SUBSCRIPTION, watchdog::confirmator::GLOBAL_CONFIRMATOR, ShmBufInfo, ShmBufInner, }; @@ -46,21 +47,30 @@ impl ShmReader { Self { client_storage } } - pub fn read_shmbuf(&self, info: &ShmBufInfo) -> ZResult { + pub fn read_shmbuf(&self, info: ShmBufInfo) -> ZResult { // Read does not increment the reference count as it is assumed // that the sender of this buffer has incremented it for us. + let metadata = GLOBAL_METADATA_SUBSCRIPTION.read().link(&info.metadata)?; // attach to the watchdog before doing other things - let watchdog = Arc::new(GLOBAL_CONFIRMATOR.read().add(&info.watchdog_descriptor)?); - - let segment = self.ensure_segment(info)?; + let confirmed_metadata = Arc::new(GLOBAL_CONFIRMATOR.read().add(metadata)); + + // retrieve data descriptor from metadata + let data_descriptor = confirmed_metadata.owned.header().data_descriptor(); + + let segment = self.ensure_data_segment( + confirmed_metadata + .owned + .header() + .protocol + .load(std::sync::atomic::Ordering::Relaxed), + &data_descriptor, + )?; + let buf = segment.map(data_descriptor.chunk)?; let shmb = ShmBufInner { - header: GLOBAL_HEADER_SUBSCRIPTION - .read() - .link(&info.header_descriptor)?, - buf: segment.map(info.data_descriptor.chunk)?, + metadata: confirmed_metadata, + buf, info: info.clone(), - watchdog, }; // Validate buffer @@ -70,8 +80,12 @@ impl ShmReader { } } - fn ensure_segment(&self, info: &ShmBufInfo) -> ZResult> { - let id = GlobalDataSegmentID::new(info.shm_protocol, info.data_descriptor.segment); + fn ensure_data_segment( + &self, + protocol_id: ProtocolID, + descriptor: &ChunkDescriptor, + ) -> ZResult> { + let id = GlobalDataSegmentID::new(protocol_id, descriptor.segment); // fastest path: try to get access to already mounted SHM segment // read lock allows concurrent execution of multiple requests @@ -101,7 +115,7 @@ impl ShmReader { // (common case) mount a new segment and add it to the map std::collections::hash_map::Entry::Vacant(vacant) => { - let new_segment = client.attach(info.data_descriptor.segment)?; + let new_segment = client.attach(descriptor.segment)?; Ok(vacant.insert(new_segment).clone()) } } diff --git a/commons/zenoh-shm/src/version.rs b/commons/zenoh-shm/src/version.rs new file mode 100644 index 0000000000..34036ba3da --- /dev/null +++ b/commons/zenoh-shm/src/version.rs @@ -0,0 +1,15 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub const SHM_VERSION: u64 = 1; diff --git a/commons/zenoh-shm/src/watchdog/allocated_watchdog.rs b/commons/zenoh-shm/src/watchdog/allocated_watchdog.rs deleted file mode 100644 index 6293b157d3..0000000000 --- a/commons/zenoh-shm/src/watchdog/allocated_watchdog.rs +++ /dev/null @@ -1,35 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use super::{descriptor::OwnedDescriptor, storage::GLOBAL_STORAGE, validator::GLOBAL_VALIDATOR}; - -#[derive(Debug)] -pub struct AllocatedWatchdog { - pub descriptor: OwnedDescriptor, -} - -impl AllocatedWatchdog { - pub(crate) fn new(descriptor: OwnedDescriptor) -> Self { - // reset descriptor on allocation - descriptor.validate(); - Self { descriptor } - } -} - -impl Drop for AllocatedWatchdog { - fn drop(&mut self) { - GLOBAL_VALIDATOR.read().remove(self.descriptor.clone()); - GLOBAL_STORAGE.read().free_watchdog(self.descriptor.clone()); - } -} diff --git a/commons/zenoh-shm/src/watchdog/confirmator.rs b/commons/zenoh-shm/src/watchdog/confirmator.rs index 1a9ac0f04f..6002b74c5f 100644 --- a/commons/zenoh-shm/src/watchdog/confirmator.rs +++ b/commons/zenoh-shm/src/watchdog/confirmator.rs @@ -18,21 +18,20 @@ use std::{ time::Duration, }; +use crossbeam_queue::SegQueue; use static_init::dynamic; -use zenoh_result::{zerror, ZResult}; +use zenoh_core::{zread, zwrite}; -use super::{ - descriptor::{Descriptor, OwnedDescriptor, SegmentID}, - periodic_task::PeriodicTask, - segment::Segment, -}; +use super::periodic_task::PeriodicTask; +use crate::metadata::descriptor::{MetadataSegmentID, OwnedMetadataDescriptor}; #[dynamic(lazy, drop)] pub static mut GLOBAL_CONFIRMATOR: WatchdogConfirmator = WatchdogConfirmator::new(Duration::from_millis(50)); +#[derive(Debug)] pub struct ConfirmedDescriptor { - pub owned: OwnedDescriptor, + pub owned: OwnedMetadataDescriptor, confirmed: Arc, } @@ -43,8 +42,7 @@ impl Drop for ConfirmedDescriptor { } impl ConfirmedDescriptor { - fn new(owned: OwnedDescriptor, confirmed: Arc) -> Self { - owned.confirm(); + fn new(owned: OwnedMetadataDescriptor, confirmed: Arc) -> Self { confirmed.add(owned.clone()); Self { owned, confirmed } } @@ -56,28 +54,23 @@ enum Transaction { Remove, } +#[derive(Debug, Default)] struct ConfirmedSegment { - segment: Arc, - transactions: lockfree::queue::Queue<(Transaction, OwnedDescriptor)>, + transactions: crossbeam_queue::SegQueue<(Transaction, OwnedMetadataDescriptor)>, } impl ConfirmedSegment { - fn new(segment: Arc) -> Self { - Self { - segment, - transactions: lockfree::queue::Queue::default(), - } - } - - fn add(&self, descriptor: OwnedDescriptor) { + fn add(&self, descriptor: OwnedMetadataDescriptor) { self.transactions.push((Transaction::Add, descriptor)); } - fn remove(&self, descriptor: OwnedDescriptor) { + fn remove(&self, descriptor: OwnedMetadataDescriptor) { self.transactions.push((Transaction::Remove, descriptor)); } - fn collect_transactions(&self, watchdogs: &mut BTreeMap) { + // See ordering implementation for OwnedMetadataDescriptor + #[allow(clippy::mutable_key_type)] + fn collect_transactions(&self, watchdogs: &mut BTreeMap) { while let Some((transaction, descriptor)) = self.transactions.pop() { // collect transactions match watchdogs.entry(descriptor) { @@ -102,23 +95,24 @@ impl ConfirmedSegment { } } } -unsafe impl Send for ConfirmedSegment {} -unsafe impl Sync for ConfirmedSegment {} // TODO: optimize confirmation by packing descriptors AND linked table together // TODO: think about linked table cleanup pub struct WatchdogConfirmator { - confirmed: RwLock>>, - segment_transactions: Arc>>, + confirmed: RwLock>>, + segment_transactions: Arc>>, _task: PeriodicTask, } impl WatchdogConfirmator { fn new(interval: Duration) -> Self { - let segment_transactions = Arc::>>::default(); + let segment_transactions = Arc::>>::default(); let c_segment_transactions = segment_transactions.clone(); - let mut segments: Vec<(Arc, BTreeMap)> = vec![]; + let mut segments: Vec<( + Arc, + BTreeMap, + )> = vec![]; let task = PeriodicTask::new("Watchdog Confirmator".to_owned(), interval, move || { // add new segments while let Some(new_segment) = c_segment_transactions.as_ref().pop() { @@ -145,47 +139,31 @@ impl WatchdogConfirmator { } } - pub fn add_owned(&self, descriptor: &OwnedDescriptor) -> ZResult { - self.add(&Descriptor::from(descriptor)) - } + pub fn add(&self, descriptor: OwnedMetadataDescriptor) -> ConfirmedDescriptor { + // confirm ASAP! + descriptor.confirm(); - pub fn add(&self, descriptor: &Descriptor) -> ZResult { - let guard = self.confirmed.read().map_err(|e| zerror!("{e}"))?; - if let Some(segment) = guard.get(&descriptor.id) { - return self.link(descriptor, segment); + let guard = zread!(self.confirmed); + if let Some(segment) = guard.get(&descriptor.segment.data.id()) { + return ConfirmedDescriptor::new(descriptor, segment.clone()); } drop(guard); - let segment = Arc::new(Segment::open(descriptor.id)?); - let confirmed_segment = Arc::new(ConfirmedSegment::new(segment)); - let confirmed_descriptoir = self.link(descriptor, &confirmed_segment); + let confirmed_segment = Arc::new(ConfirmedSegment::default()); + let confirmed_descriptoir = + ConfirmedDescriptor::new(descriptor.clone(), confirmed_segment.clone()); - let mut guard = self.confirmed.write().map_err(|e| zerror!("{e}"))?; - match guard.entry(descriptor.id) { + let mut guard = zwrite!(self.confirmed); + match guard.entry(descriptor.segment.data.id()) { std::collections::btree_map::Entry::Vacant(vacant) => { vacant.insert(confirmed_segment.clone()); self.segment_transactions.push(confirmed_segment); confirmed_descriptoir } std::collections::btree_map::Entry::Occupied(occupied) => { - self.link(descriptor, occupied.get()) + // this is intentional + ConfirmedDescriptor::new(descriptor, occupied.get().clone()) } } } - - fn link( - &self, - descriptor: &Descriptor, - segment: &Arc, - ) -> ZResult { - let index = descriptor.index_and_bitpos >> 6; - let bitpos = descriptor.index_and_bitpos & 0x3f; - - let atomic = unsafe { segment.segment.array.elem(index) }; - let mask = 1u64 << bitpos; - - let owned = OwnedDescriptor::new(segment.segment.clone(), atomic, mask); - let confirmed = ConfirmedDescriptor::new(owned, segment.clone()); - Ok(confirmed) - } } diff --git a/commons/zenoh-shm/src/watchdog/descriptor.rs b/commons/zenoh-shm/src/watchdog/descriptor.rs deleted file mode 100644 index 38fddd61e8..0000000000 --- a/commons/zenoh-shm/src/watchdog/descriptor.rs +++ /dev/null @@ -1,116 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use std::{ - hash::Hash, - sync::{atomic::AtomicU64, Arc}, -}; - -use super::segment::Segment; - -pub type SegmentID = u32; - -#[derive(Clone, Eq, Hash, PartialEq, PartialOrd, Ord, Debug)] -pub struct Descriptor { - pub id: SegmentID, - pub index_and_bitpos: u32, -} - -impl From<&OwnedDescriptor> for Descriptor { - fn from(item: &OwnedDescriptor) -> Self { - let bitpos = { - // TODO: can be optimized - let mut v = item.mask; - let mut bitpos = 0u32; - while v > 1 { - bitpos += 1; - v >>= 1; - } - bitpos - }; - let index = unsafe { item.segment.array.index(item.atomic) }; - let index_and_bitpos = (index << 6) | bitpos; - Descriptor { - id: item.segment.array.id(), - index_and_bitpos, - } - } -} - -#[derive(Clone, Debug)] -pub struct OwnedDescriptor { - segment: Arc, - pub atomic: *const AtomicU64, - pub mask: u64, -} - -unsafe impl Send for OwnedDescriptor {} -unsafe impl Sync for OwnedDescriptor {} - -impl Hash for OwnedDescriptor { - fn hash(&self, state: &mut H) { - self.atomic.hash(state); - self.mask.hash(state); - } -} - -impl OwnedDescriptor { - pub(crate) fn new(segment: Arc, atomic: *const AtomicU64, mask: u64) -> Self { - Self { - segment, - atomic, - mask, - } - } - - pub fn confirm(&self) { - unsafe { - (*self.atomic).fetch_or(self.mask, std::sync::atomic::Ordering::SeqCst); - }; - } - - pub(crate) fn validate(&self) -> u64 { - unsafe { - (*self.atomic).fetch_and(!self.mask, std::sync::atomic::Ordering::SeqCst) & self.mask - } - } - - #[cfg(feature = "test")] - pub fn test_validate(&self) -> u64 { - self.validate() - } -} - -impl Ord for OwnedDescriptor { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - match self.atomic.cmp(&other.atomic) { - core::cmp::Ordering::Equal => {} - ord => return ord, - } - self.mask.cmp(&other.mask) - } -} - -impl PartialOrd for OwnedDescriptor { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for OwnedDescriptor { - fn eq(&self, other: &Self) -> bool { - self.atomic == other.atomic && self.mask == other.mask - } -} -impl Eq for OwnedDescriptor {} diff --git a/commons/zenoh-shm/src/watchdog/mod.rs b/commons/zenoh-shm/src/watchdog/mod.rs index 55267a5442..17f17a9e6d 100644 --- a/commons/zenoh-shm/src/watchdog/mod.rs +++ b/commons/zenoh-shm/src/watchdog/mod.rs @@ -12,13 +12,6 @@ // ZettaScale Zenoh Team, // -pub mod descriptor; - tested_crate_module!(periodic_task); -tested_crate_module!(storage); tested_crate_module!(validator); tested_crate_module!(confirmator); - -pub(crate) mod allocated_watchdog; - -mod segment; diff --git a/commons/zenoh-shm/src/watchdog/segment.rs b/commons/zenoh-shm/src/watchdog/segment.rs deleted file mode 100644 index 5943a10153..0000000000 --- a/commons/zenoh-shm/src/watchdog/segment.rs +++ /dev/null @@ -1,40 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use std::sync::atomic::AtomicU64; - -use zenoh_result::ZResult; - -use super::descriptor::SegmentID; -use crate::posix_shm::array::ArrayInSHM; - -const WATCHDOG_SEGMENT_PREFIX: &str = "watchdog"; - -#[derive(Debug)] -pub struct Segment { - pub array: ArrayInSHM, -} - -impl Segment { - pub fn create(watchdog_count: usize) -> ZResult { - let elem_count = (watchdog_count + 63) / 64; - let array = ArrayInSHM::create(elem_count, WATCHDOG_SEGMENT_PREFIX)?; - Ok(Self { array }) - } - - pub fn open(id: SegmentID) -> ZResult { - let array = ArrayInSHM::open(id, WATCHDOG_SEGMENT_PREFIX)?; - Ok(Self { array }) - } -} diff --git a/commons/zenoh-shm/src/watchdog/storage.rs b/commons/zenoh-shm/src/watchdog/storage.rs deleted file mode 100644 index ff9772961c..0000000000 --- a/commons/zenoh-shm/src/watchdog/storage.rs +++ /dev/null @@ -1,75 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::{ - collections::BTreeSet, - sync::{Arc, Mutex}, -}; - -use static_init::dynamic; -use zenoh_result::{zerror, ZResult}; - -use super::{allocated_watchdog::AllocatedWatchdog, descriptor::OwnedDescriptor, segment::Segment}; - -#[dynamic(lazy, drop)] -pub static mut GLOBAL_STORAGE: WatchdogStorage = WatchdogStorage::new(32768usize).unwrap(); - -pub struct WatchdogStorage { - available: Arc>>, -} - -// TODO: expand and shrink Storage when needed -// OR -// support multiple descriptor assignment (allow multiple buffers to be assigned to the same watchdog) -impl WatchdogStorage { - pub fn new(initial_watchdog_count: usize) -> ZResult { - let segment = Arc::new(Segment::create(initial_watchdog_count)?); - - let mut initially_available = BTreeSet::default(); - let subsegments = segment.array.elem_count(); - for subsegment in 0..subsegments { - let atomic = unsafe { segment.array.elem(subsegment as u32) }; - - for bit in 0..64 { - let mask = 1u64 << bit; - let descriptor = OwnedDescriptor::new(segment.clone(), atomic, mask); - let _new_insert = initially_available.insert(descriptor); - #[cfg(feature = "test")] - assert!(_new_insert); - } - } - - Ok(Self { - available: Arc::new(Mutex::new(initially_available)), - }) - } - - pub fn allocate_watchdog(&self) -> ZResult { - let mut guard = self.available.lock().map_err(|e| zerror!("{e}"))?; - let popped = guard.pop_first(); - drop(guard); - - let allocated = - AllocatedWatchdog::new(popped.ok_or_else(|| zerror!("no free watchdogs available"))?); - - Ok(allocated) - } - - pub(crate) fn free_watchdog(&self, descriptor: OwnedDescriptor) { - if let Ok(mut guard) = self.available.lock() { - let _new_insert = guard.insert(descriptor); - #[cfg(feature = "test")] - assert!(_new_insert); - } - } -} diff --git a/commons/zenoh-shm/src/watchdog/validator.rs b/commons/zenoh-shm/src/watchdog/validator.rs index 5becefb547..d2c4455c0a 100644 --- a/commons/zenoh-shm/src/watchdog/validator.rs +++ b/commons/zenoh-shm/src/watchdog/validator.rs @@ -12,45 +12,45 @@ // ZettaScale Zenoh Team, // -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{collections::BTreeSet, sync::Arc, time::Duration}; use static_init::dynamic; -use super::{descriptor::OwnedDescriptor, periodic_task::PeriodicTask}; - -pub(super) type InvalidateCallback = Box; +use super::periodic_task::PeriodicTask; +use crate::metadata::descriptor::OwnedMetadataDescriptor; #[dynamic(lazy, drop)] pub static mut GLOBAL_VALIDATOR: WatchdogValidator = WatchdogValidator::new(Duration::from_millis(100)); enum Transaction { - Add(InvalidateCallback), + Add, Remove, } #[derive(Default)] struct ValidatedStorage { - transactions: lockfree::queue::Queue<(Transaction, OwnedDescriptor)>, + transactions: crossbeam_queue::SegQueue<(Transaction, OwnedMetadataDescriptor)>, } impl ValidatedStorage { - fn add(&self, descriptor: OwnedDescriptor, on_invalidated: InvalidateCallback) { - self.transactions - .push((Transaction::Add(on_invalidated), descriptor)); + fn add(&self, descriptor: OwnedMetadataDescriptor) { + self.transactions.push((Transaction::Add, descriptor)); } - fn remove(&self, descriptor: OwnedDescriptor) { + fn remove(&self, descriptor: OwnedMetadataDescriptor) { self.transactions.push((Transaction::Remove, descriptor)); } - fn collect_transactions(&self, storage: &mut BTreeMap) { + // See ordering implementation for OwnedMetadataDescriptor + #[allow(clippy::mutable_key_type)] + fn collect_transactions(&self, storage: &mut BTreeSet) { while let Some((transaction, descriptor)) = self.transactions.pop() { match transaction { - Transaction::Add(on_invalidated) => { - let _old = storage.insert(descriptor, on_invalidated); + Transaction::Add => { + let _old = storage.insert(descriptor); #[cfg(feature = "test")] - assert!(_old.is_none()); + assert!(_old); } Transaction::Remove => { let _ = storage.remove(&descriptor); @@ -71,14 +71,19 @@ impl WatchdogValidator { let storage = Arc::new(ValidatedStorage::default()); let c_storage = storage.clone(); - let mut watchdogs = BTreeMap::default(); + // See ordering implementation for OwnedMetadataDescriptor + #[allow(clippy::mutable_key_type)] + let mut watchdogs = BTreeSet::default(); let task = PeriodicTask::new("Watchdog Validator".to_owned(), interval, move || { c_storage.collect_transactions(&mut watchdogs); - watchdogs.retain(|watchdog, on_invalidated| { + watchdogs.retain(|watchdog| { let old_val = watchdog.validate(); if old_val == 0 { - on_invalidated(); + watchdog + .header() + .watchdog_invalidated + .store(true, std::sync::atomic::Ordering::Relaxed); return false; } true @@ -91,11 +96,11 @@ impl WatchdogValidator { } } - pub fn add(&self, watchdog: OwnedDescriptor, on_invalidated: InvalidateCallback) { - self.storage.add(watchdog, on_invalidated); + pub fn add(&self, watchdog: OwnedMetadataDescriptor) { + self.storage.add(watchdog); } - pub fn remove(&self, watchdog: OwnedDescriptor) { + pub fn remove(&self, watchdog: OwnedMetadataDescriptor) { self.storage.remove(watchdog); } } diff --git a/commons/zenoh-shm/tests/header.rs b/commons/zenoh-shm/tests/header.rs deleted file mode 100644 index 747757a3b2..0000000000 --- a/commons/zenoh-shm/tests/header.rs +++ /dev/null @@ -1,130 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -#![cfg(feature = "test")] -use std::sync::atomic::Ordering::Relaxed; - -use rand::Rng; -use zenoh_result::ZResult; -use zenoh_shm::header::{ - descriptor::HeaderDescriptor, storage::GLOBAL_HEADER_STORAGE, - subscription::GLOBAL_HEADER_SUBSCRIPTION, -}; - -pub mod common; -use common::execute_concurrent; - -fn header_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { - |_task_index: usize, _iteration: usize| -> ZResult<()> { - let _allocated_header = GLOBAL_HEADER_STORAGE.read().allocate_header()?; - Ok(()) - } -} - -#[test] -fn header_alloc() { - execute_concurrent(1, 1000, header_alloc_fn()); -} - -#[test] -fn header_alloc_concurrent() { - execute_concurrent(100, 1000, header_alloc_fn()); -} - -fn header_link_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { - |_task_index: usize, _iteration: usize| { - let allocated_header = GLOBAL_HEADER_STORAGE.read().allocate_header()?; - let descr = HeaderDescriptor::from(&allocated_header.descriptor); - let _linked_header = GLOBAL_HEADER_SUBSCRIPTION.read().link(&descr)?; - Ok(()) - } -} - -#[test] -fn header_link() { - execute_concurrent(1, 1000, header_link_fn()); -} - -#[test] -fn header_link_concurrent() { - execute_concurrent(100, 1000, header_link_fn()); -} - -fn header_link_failure_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static -{ - |_task_index: usize, _iteration: usize| { - let allocated_header = GLOBAL_HEADER_STORAGE.read().allocate_header()?; - let descr = HeaderDescriptor::from(&allocated_header.descriptor); - drop(allocated_header); - - // Some comments on this behaviour... - // Even though the allocated_header is dropped, it's SHM segment still exists in GLOBAL_HEADER_STORAGE, - // so there is no way to detect that header is "deallocated" and the code below succeeds. The invalidation - // functionality is implemented on higher level by means of generation mechanism and protects from both header - // and watchdog link-to-deallocated issues. This generation mechanism depends on the behaviour below, so - // everything is fair :) - let _linked_header = GLOBAL_HEADER_SUBSCRIPTION.read().link(&descr)?; - Ok(()) - } -} - -#[test] -fn header_link_failure() { - execute_concurrent(1, 1000, header_link_failure_fn()); -} - -#[test] -fn header_link_failure_concurrent() { - execute_concurrent(100, 1000, header_link_failure_fn()); -} - -fn header_check_memory_fn(parallel_tasks: usize, iterations: usize) { - let task_fun = |_task_index: usize, _iteration: usize| -> ZResult<()> { - let allocated_header = GLOBAL_HEADER_STORAGE.read().allocate_header()?; - let descr = HeaderDescriptor::from(&allocated_header.descriptor); - let linked_header = GLOBAL_HEADER_SUBSCRIPTION.read().link(&descr)?; - - let mut rng = rand::thread_rng(); - let allocated = allocated_header.descriptor.header(); - let linked = linked_header.header(); - for _ in 0..100 { - let gen = rng.gen(); - allocated.generation.store(gen, Relaxed); - assert_eq!(gen, linked.generation.load(Relaxed)); - - let rc = rng.gen(); - allocated.refcount.store(rc, Relaxed); - assert_eq!(rc, linked.refcount.load(Relaxed)); - - let watchdog_inv = rng.gen(); - allocated.watchdog_invalidated.store(watchdog_inv, Relaxed); - assert_eq!(watchdog_inv, linked.watchdog_invalidated.load(Relaxed)); - - assert_eq!(gen, linked.generation.load(Relaxed)); - assert_eq!(rc, linked.refcount.load(Relaxed)); - assert_eq!(watchdog_inv, linked.watchdog_invalidated.load(Relaxed)); - } - Ok(()) - }; - execute_concurrent(parallel_tasks, iterations, task_fun); -} - -#[test] -fn header_check_memory() { - header_check_memory_fn(1, 1000); -} - -#[test] -fn header_check_memory_concurrent() { - header_check_memory_fn(100, 100); -} diff --git a/commons/zenoh-shm/tests/watchdog.rs b/commons/zenoh-shm/tests/metadata.rs similarity index 53% rename from commons/zenoh-shm/tests/watchdog.rs rename to commons/zenoh-shm/tests/metadata.rs index bc4a75dfa9..4c90b4b623 100644 --- a/commons/zenoh-shm/tests/watchdog.rs +++ b/commons/zenoh-shm/tests/metadata.rs @@ -12,43 +12,134 @@ // ZettaScale Zenoh Team, // #![cfg(feature = "test")] -use std::{ - sync::{atomic::AtomicBool, Arc}, - time::Duration, -}; +use std::{ops::Deref, sync::atomic::Ordering::Relaxed, time::Duration}; -use zenoh_result::{bail, ZResult}; -use zenoh_shm::watchdog::{ - confirmator::GLOBAL_CONFIRMATOR, storage::GLOBAL_STORAGE, validator::GLOBAL_VALIDATOR, -}; +use rand::Rng; +use zenoh_core::bail; +use zenoh_result::ZResult; pub mod common; use common::{execute_concurrent, CpuLoad}; +use zenoh_shm::{ + metadata::{ + descriptor::MetadataDescriptor, storage::GLOBAL_METADATA_STORAGE, + subscription::GLOBAL_METADATA_SUBSCRIPTION, + }, + watchdog::{confirmator::GLOBAL_CONFIRMATOR, validator::GLOBAL_VALIDATOR}, +}; -const VALIDATION_PERIOD: Duration = Duration::from_millis(100); -const CONFIRMATION_PERIOD: Duration = Duration::from_millis(50); - -fn watchdog_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { +fn metadata_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { - let _allocated = GLOBAL_STORAGE.read().allocate_watchdog()?; + let _allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?; Ok(()) } } #[test] -fn watchdog_alloc() { - execute_concurrent(1, 10000, watchdog_alloc_fn()); +fn metadata_alloc() { + execute_concurrent(1, 1000, metadata_alloc_fn()); } #[test] -fn watchdog_alloc_concurrent() { - execute_concurrent(1000, 10000, watchdog_alloc_fn()); +fn metadata_alloc_concurrent() { + execute_concurrent(100, 1000, metadata_alloc_fn()); } +fn metadata_link_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { + |_task_index: usize, _iteration: usize| { + let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?; + let descr = MetadataDescriptor::from(allocated_metadata.deref()); + let _linked_metadata = GLOBAL_METADATA_SUBSCRIPTION.read().link(&descr)?; + Ok(()) + } +} + +#[test] +fn metadata_link() { + execute_concurrent(1, 1000, metadata_link_fn()); +} + +#[test] +fn metadata_link_concurrent() { + execute_concurrent(100, 1000, metadata_link_fn()); +} + +fn metadata_link_failure_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static +{ + |_task_index: usize, _iteration: usize| { + let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?; + let descr = MetadataDescriptor::from(allocated_metadata.deref()); + drop(allocated_metadata); + + // Some comments on this behaviour... + // Even though the allocated_metadata is dropped, its SHM segment still exists in GLOBAL_METADATA_STORAGE, + // so there is no way to detect that metadata is "deallocated" and the code below succeeds. The invalidation + // functionality is implemented on higher level by means of a generation mechanism that protects from both metadata + // and watchdog link-to-deallocated issues. This generation mechanism depends on the behaviour below, so + // everything is fair :) + let _linked_metadata = GLOBAL_METADATA_SUBSCRIPTION.read().link(&descr)?; + Ok(()) + } +} + +#[test] +fn metadata_link_failure() { + execute_concurrent(1, 1000, metadata_link_failure_fn()); +} + +#[test] +fn metadata_link_failure_concurrent() { + execute_concurrent(100, 1000, metadata_link_failure_fn()); +} + +fn metadata_check_memory_fn(parallel_tasks: usize, iterations: usize) { + let task_fun = |_task_index: usize, _iteration: usize| -> ZResult<()> { + let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?; + let descr = MetadataDescriptor::from(allocated_metadata.deref()); + let linked_metadata = GLOBAL_METADATA_SUBSCRIPTION.read().link(&descr)?; + + let mut rng = rand::thread_rng(); + let allocated = allocated_metadata.header(); + let linked = linked_metadata.header(); + for _ in 0..100 { + let gen = rng.gen(); + allocated.generation.store(gen, Relaxed); + assert_eq!(gen, linked.generation.load(Relaxed)); + + let rc = rng.gen(); + allocated.refcount.store(rc, Relaxed); + assert_eq!(rc, linked.refcount.load(Relaxed)); + + let watchdog_inv = rng.gen(); + allocated.watchdog_invalidated.store(watchdog_inv, Relaxed); + assert_eq!(watchdog_inv, linked.watchdog_invalidated.load(Relaxed)); + + assert_eq!(gen, linked.generation.load(Relaxed)); + assert_eq!(rc, linked.refcount.load(Relaxed)); + assert_eq!(watchdog_inv, linked.watchdog_invalidated.load(Relaxed)); + } + Ok(()) + }; + execute_concurrent(parallel_tasks, iterations, task_fun); +} + +#[test] +fn metadata_check_memory() { + metadata_check_memory_fn(1, 1000); +} + +#[test] +fn metadata_check_memory_concurrent() { + metadata_check_memory_fn(100, 100); +} + +const VALIDATION_PERIOD: Duration = Duration::from_millis(100); +const CONFIRMATION_PERIOD: Duration = Duration::from_millis(50); + fn watchdog_confirmed_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { - let allocated = GLOBAL_STORAGE.read().allocate_watchdog()?; - let confirmed = GLOBAL_CONFIRMATOR.read().add_owned(&allocated.descriptor)?; + let allocated = GLOBAL_METADATA_STORAGE.read().allocate()?; + let confirmed = GLOBAL_CONFIRMATOR.read().add(allocated.clone()); // check that the confirmed watchdog stays valid for i in 0..10 { @@ -80,14 +171,11 @@ fn watchdog_confirmed_concurrent() { #[test] #[ignore] fn watchdog_confirmed_dangling() { - let allocated = GLOBAL_STORAGE + let allocated = GLOBAL_METADATA_STORAGE .read() - .allocate_watchdog() + .allocate() .expect("error allocating watchdog!"); - let confirmed = GLOBAL_CONFIRMATOR - .read() - .add_owned(&allocated.descriptor) - .expect("error adding watchdog to confirmator!"); + let confirmed = GLOBAL_CONFIRMATOR.read().add(allocated.clone()); drop(allocated); // confirm dangling (not allocated) watchdog @@ -99,24 +187,19 @@ fn watchdog_confirmed_dangling() { fn watchdog_validated_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { - let allocated = GLOBAL_STORAGE.read().allocate_watchdog()?; - let confirmed = GLOBAL_CONFIRMATOR.read().add_owned(&allocated.descriptor)?; - - let valid = Arc::new(AtomicBool::new(true)); - { - let c_valid = valid.clone(); - GLOBAL_VALIDATOR.read().add( - allocated.descriptor.clone(), - Box::new(move || { - c_valid.store(false, std::sync::atomic::Ordering::SeqCst); - }), - ); - } + let allocated = GLOBAL_METADATA_STORAGE.read().allocate()?; + let confirmed = GLOBAL_CONFIRMATOR.read().add(allocated.clone()); + + GLOBAL_VALIDATOR.read().add(allocated.clone()); // check that the watchdog stays valid as it is confirmed for i in 0..10 { std::thread::sleep(VALIDATION_PERIOD); - if !valid.load(std::sync::atomic::Ordering::SeqCst) { + if allocated + .header() + .watchdog_invalidated + .load(std::sync::atomic::Ordering::SeqCst) + { bail!("Invalid watchdog, iteration {i}"); } } @@ -130,7 +213,10 @@ fn watchdog_validated_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen // check that the watchdog becomes invalid once we stop it's confirmation drop(confirmed); std::thread::sleep(VALIDATION_PERIOD * 3 + CONFIRMATION_PERIOD); - assert!(!valid.load(std::sync::atomic::Ordering::SeqCst)); + assert!(allocated + .header() + .watchdog_invalidated + .load(std::sync::atomic::Ordering::SeqCst)); Ok(()) } @@ -151,27 +237,23 @@ fn watchdog_validated_concurrent() { fn watchdog_validated_invalid_without_confirmator_fn( ) -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { - let allocated = GLOBAL_STORAGE + let allocated = GLOBAL_METADATA_STORAGE .read() - .allocate_watchdog() + .allocate() .expect("error allocating watchdog!"); - let valid = Arc::new(AtomicBool::new(true)); - { - let c_valid = valid.clone(); - GLOBAL_VALIDATOR.read().add( - allocated.descriptor.clone(), - Box::new(move || { - c_valid.store(false, std::sync::atomic::Ordering::SeqCst); - }), - ); - } + assert!(allocated.test_validate() == 0); - assert!(allocated.descriptor.test_validate() == 0); + // add watchdog to validator + GLOBAL_VALIDATOR.read().add(allocated.clone()); // check that the watchdog becomes invalid because we do not confirm it std::thread::sleep(VALIDATION_PERIOD * 2 + CONFIRMATION_PERIOD); - assert!(!valid.load(std::sync::atomic::Ordering::SeqCst)); + assert!(allocated + .header() + .watchdog_invalidated + .load(std::sync::atomic::Ordering::SeqCst)); + Ok(()) } } @@ -195,26 +277,14 @@ fn watchdog_validated_invalid_without_confirmator_concurrent() { fn watchdog_validated_additional_confirmation_fn( ) -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { - let allocated = GLOBAL_STORAGE + let allocated = GLOBAL_METADATA_STORAGE .read() - .allocate_watchdog() + .allocate() .expect("error allocating watchdog!"); - let confirmed = GLOBAL_CONFIRMATOR - .read() - .add_owned(&allocated.descriptor) - .expect("error adding watchdog to confirmator!"); - - let allow_invalid = Arc::new(AtomicBool::new(false)); - { - let c_allow_invalid = allow_invalid.clone(); - GLOBAL_VALIDATOR.read().add( - allocated.descriptor.clone(), - Box::new(move || { - assert!(c_allow_invalid.load(std::sync::atomic::Ordering::SeqCst)); - c_allow_invalid.store(false, std::sync::atomic::Ordering::SeqCst); - }), - ); - } + let confirmed = GLOBAL_CONFIRMATOR.read().add(allocated.clone()); + + // add watchdog to validator + GLOBAL_VALIDATOR.read().add(allocated.clone()); // make additional confirmations for _ in 0..100 { @@ -224,6 +294,10 @@ fn watchdog_validated_additional_confirmation_fn( // check that the watchdog stays valid as we stop additional confirmation std::thread::sleep(VALIDATION_PERIOD * 10); + assert!(!allocated + .header() + .watchdog_invalidated + .load(std::sync::atomic::Ordering::SeqCst)); // Worst-case timings: // validation: |___________|___________|___________|___________| @@ -233,10 +307,13 @@ fn watchdog_validated_additional_confirmation_fn( // check that the watchdog becomes invalid once we stop it's regular confirmation drop(confirmed); - allow_invalid.store(true, std::sync::atomic::Ordering::SeqCst); std::thread::sleep(VALIDATION_PERIOD * 2 + CONFIRMATION_PERIOD); // check that invalidation event happened! - assert!(!allow_invalid.load(std::sync::atomic::Ordering::SeqCst)); + assert!(allocated + .header() + .watchdog_invalidated + .load(std::sync::atomic::Ordering::SeqCst)); + Ok(()) } } @@ -256,29 +333,21 @@ fn watchdog_validated_additional_confirmation_concurrent() { fn watchdog_validated_overloaded_system_fn( ) -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { - let allocated = GLOBAL_STORAGE + let allocated = GLOBAL_METADATA_STORAGE .read() - .allocate_watchdog() + .allocate() .expect("error allocating watchdog!"); - let confirmed = GLOBAL_CONFIRMATOR - .read() - .add_owned(&allocated.descriptor) - .expect("error adding watchdog to confirmator!"); - - let allow_invalid = Arc::new(AtomicBool::new(false)); - { - let c_allow_invalid = allow_invalid.clone(); - GLOBAL_VALIDATOR.read().add( - allocated.descriptor.clone(), - Box::new(move || { - assert!(c_allow_invalid.load(std::sync::atomic::Ordering::SeqCst)); - c_allow_invalid.store(false, std::sync::atomic::Ordering::SeqCst); - }), - ); - } + let confirmed = GLOBAL_CONFIRMATOR.read().add(allocated.clone()); + + // add watchdog to validator + GLOBAL_VALIDATOR.read().add(allocated.clone()); // check that the watchdog stays valid std::thread::sleep(VALIDATION_PERIOD * 10); + assert!(!allocated + .header() + .watchdog_invalidated + .load(std::sync::atomic::Ordering::SeqCst)); // Worst-case timings: // validation: |___________|___________|___________|___________| @@ -288,10 +357,13 @@ fn watchdog_validated_overloaded_system_fn( // check that the watchdog becomes invalid once we stop it's regular confirmation drop(confirmed); - allow_invalid.store(true, std::sync::atomic::Ordering::SeqCst); std::thread::sleep(VALIDATION_PERIOD * 2 + CONFIRMATION_PERIOD); // check that invalidation event happened! - assert!(!allow_invalid.load(std::sync::atomic::Ordering::SeqCst)); + assert!(allocated + .header() + .watchdog_invalidated + .load(std::sync::atomic::Ordering::SeqCst)); + Ok(()) } } diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index c562e47135..35aa2c7396 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -293,7 +293,7 @@ fn to_shm_partner( let mut res = false; for zs in zbuf.zslices_mut() { if let Some(shmb) = zs.downcast_ref::() { - if partner_shm_cfg.supports_protocol(shmb.info.shm_protocol) { + if partner_shm_cfg.supports_protocol(shmb.protocol()) { *zs = shmbuf_to_shminfo(shmb)?; res = true; } else { @@ -331,7 +331,7 @@ pub fn map_zslice_to_shmbuf(zslice: &mut ZSlice, shmr: &ShmReader) -> ZResult<() let shmbinfo: ShmBufInfo = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?; // Mount shmbuf - let smb = shmr.read_shmbuf(&shmbinfo)?; + let smb = shmr.read_shmbuf(shmbinfo)?; // Replace the content of the slice *zslice = smb.into(); diff --git a/io/zenoh-transport/src/unicast/establishment/ext/shm.rs b/io/zenoh-transport/src/unicast/establishment/ext/shm.rs index 35eff8a809..0d0d8f0bb7 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/shm.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/shm.rs @@ -24,7 +24,9 @@ use zenoh_core::bail; use zenoh_crypto::PseudoRng; use zenoh_protocol::transport::{init, open}; use zenoh_result::{zerror, Error as ZError, ZResult}; -use zenoh_shm::{api::common::types::ProtocolID, posix_shm::array::ArrayInSHM}; +use zenoh_shm::{ + api::common::types::ProtocolID, posix_shm::array::ArrayInSHM, version::SHM_VERSION, +}; use crate::unicast::establishment::{AcceptFsm, OpenFsm}; @@ -38,7 +40,8 @@ pub(crate) type AuthChallenge = u64; const LEN_INDEX: usize = 0; const CHALLENGE_INDEX: usize = 1; -const ID_START_INDEX: usize = 2; +const VERSION_INDEX: usize = 2; +const ID_START_INDEX: usize = 3; #[derive(Debug)] pub struct AuthSegment { @@ -53,7 +56,10 @@ impl AuthSegment { )?; unsafe { (*array.elem_mut(LEN_INDEX)) = shm_protocols.len() as AuthChallenge; - (*array.elem_mut(CHALLENGE_INDEX)) = challenge; + // challenge field is inverted to prevent SHM probing between new versioned + // SHM implementation and the old one + (*array.elem_mut(CHALLENGE_INDEX)) = !challenge; + (*array.elem_mut(VERSION_INDEX)) = SHM_VERSION; for elem in ID_START_INDEX..array.elem_count() { (*array.elem_mut(elem)) = shm_protocols[elem - ID_START_INDEX] as u64; } @@ -63,11 +69,45 @@ impl AuthSegment { pub fn open(id: AuthSegmentID) -> ZResult { let array = ArrayInSHM::open(id, AUTH_SEGMENT_PREFIX)?; + + // validate minimal array length + if array.elem_count() < ID_START_INDEX { + bail!("SHM auth segment is too small, maybe the other side is using an incompatible SHM version?") + } + Ok(Self { array }) } pub fn challenge(&self) -> AuthChallenge { - unsafe { *self.array.elem(CHALLENGE_INDEX) } + // challenge field is inverted to prevent SHM probing between new versioned + // SHM implementation and the old one + unsafe { !(*self.array.elem(CHALLENGE_INDEX)) } + } + + pub fn validate_challenge(&self, expected_challenge: AuthChallenge, s: &str) -> bool { + let challnge_in_shm = self.challenge(); + if challnge_in_shm != expected_challenge { + tracing::debug!( + "{} Challenge mismatch: expected: {}, found in shm: {}.", + s, + expected_challenge, + challnge_in_shm + ); + return false; + } + + let version_in_shm = unsafe { *self.array.elem(VERSION_INDEX) }; + if version_in_shm != SHM_VERSION { + tracing::debug!( + "{} Version mismatch: ours: {}, theirs: {}.", + s, + SHM_VERSION, + version_in_shm + ); + return false; + } + + true } pub fn protocols(&self) -> Vec { @@ -280,17 +320,8 @@ impl<'a> OpenFsm for &'a ShmFsm<'a> { return Ok(None); }; - // Alice challenge as seen by Alice - let challenge = self.inner.challenge(); - // Verify that Bob has correctly read Alice challenge - if challenge != init_ack.alice_challenge { - tracing::trace!( - "{} Challenge mismatch: {} != {}.", - S, - init_ack.alice_challenge, - challenge - ); + if !self.inner.validate_challenge(init_ack.alice_challenge, S) { return Ok(None); } @@ -455,18 +486,9 @@ impl<'a> AcceptFsm for &'a ShmFsm<'a> { return Ok(()); }; - // Bob challenge as seen by Bob - let challenge = self.inner.challenge(); - // Verify that Alice has correctly read Bob challenge let bob_challnge = ext.value; - if challenge != bob_challnge { - tracing::trace!( - "{} Challenge mismatch: {} != {}.", - S, - bob_challnge, - challenge - ); + if !self.inner.validate_challenge(bob_challnge, S) { return Ok(()); } diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index 83dcc2d3d9..bac40d84f2 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -937,8 +937,8 @@ async fn three_node_combination_multicast() -> Result<()> { ]; let mut idx = 0; - // Ports going to be used: 17511 .. 17535 - let base_port = 17510; + // Ports going to be used: 18511 .. 18535 + let base_port = 18510; let recipe_list: Vec<_> = modes .map(|n1| modes.map(|n2| (n1, n2)))