From 25f7b05e3647738ded84dc43ed5a1b227fe788d1 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 17 Jan 2025 18:12:57 +0300 Subject: [PATCH] no need to explicitly store ChunkDescriptor in BusyChunk as now it is stored in shared metadata --- .../src/api/provider/shm_provider.rs | 36 ++++++++----------- commons/zenoh-shm/src/header/chunk_header.rs | 30 +++++++++++++--- commons/zenoh-shm/src/reader.rs | 26 ++++++++------ 3 files changed, 55 insertions(+), 37 deletions(-) diff --git a/commons/zenoh-shm/src/api/provider/shm_provider.rs b/commons/zenoh-shm/src/api/provider/shm_provider.rs index eb01351c7..d84777378 100644 --- a/commons/zenoh-shm/src/api/provider/shm_provider.rs +++ b/commons/zenoh-shm/src/api/provider/shm_provider.rs @@ -49,16 +49,16 @@ use crate::{ #[derive(Debug)] struct BusyChunk { - descriptor: ChunkDescriptor, metadata: AllocatedMetadataDescriptor, } impl BusyChunk { - fn new(descriptor: ChunkDescriptor, metadata: AllocatedMetadataDescriptor) -> Self { - Self { - descriptor, - metadata, - } + fn new(metadata: AllocatedMetadataDescriptor) -> Self { + Self { metadata } + } + + fn descriptor(&self) -> ChunkDescriptor { + self.metadata.header().data_descriptor() } } @@ -219,7 +219,7 @@ impl ForceDeallocPolicy for DeallocOptimal { }; drop(guard); - provider.backend.free(&chunk_to_dealloc.descriptor); + provider.backend.free(&chunk_to_dealloc.descriptor()); true } } @@ -233,7 +233,7 @@ impl ForceDeallocPolicy for DeallocYoungest { ) -> bool { match provider.busy_list.lock().unwrap().pop_back() { Some(val) => { - provider.backend.free(&val.descriptor); + provider.backend.free(&val.descriptor()); true } None => false, @@ -250,7 +250,7 @@ impl ForceDeallocPolicy for DeallocEldest { ) -> bool { match provider.busy_list.lock().unwrap().pop_front() { Some(val) => { - provider.backend.free(&val.descriptor); + provider.backend.free(&val.descriptor()); true } None => false, @@ -839,8 +839,9 @@ where guard.retain(|maybe_free| { if is_free_chunk(maybe_free) { tracing::trace!("Garbage Collecting Chunk: {:?}", maybe_free); - self.backend.free(&maybe_free.descriptor); - largest = largest.max(maybe_free.descriptor.len.get()); + let descriptor_to_free = maybe_free.descriptor(); + self.backend.free(&descriptor_to_free); + largest = largest.max(descriptor_to_free.len.get()); return false; } true @@ -912,16 +913,7 @@ where // chunk descriptor allocated_metadata .header() - .segment - .store(chunk.descriptor.segment, Ordering::Relaxed); - allocated_metadata - .header() - .chunk - .store(chunk.descriptor.chunk, Ordering::Relaxed); - allocated_metadata - .header() - .len - .store(chunk.descriptor.len.into(), Ordering::Relaxed); + .set_data_descriptor(&chunk.descriptor); // protocol allocated_metadata .header() @@ -954,7 +946,7 @@ where self.busy_list .lock() .unwrap() - .push_back(BusyChunk::new(chunk.descriptor, allocated_metadata)); + .push_back(BusyChunk::new(allocated_metadata)); shmb } diff --git a/commons/zenoh-shm/src/header/chunk_header.rs b/commons/zenoh-shm/src/header/chunk_header.rs index abc3f2599..f5d0ca1c7 100644 --- a/commons/zenoh-shm/src/header/chunk_header.rs +++ b/commons/zenoh-shm/src/header/chunk_header.rs @@ -12,7 +12,12 @@ // ZettaScale Zenoh Team, // -use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize}; +use std::{ + num::NonZeroUsize, + sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}, +}; + +use crate::api::provider::chunk::ChunkDescriptor; // Chunk header #[stabby::stabby] @@ -30,7 +35,24 @@ pub struct ChunkHeaderType { pub protocol: AtomicU32, /// The data chunk descriptor - pub segment: AtomicU32, - pub chunk: AtomicU32, - pub len: AtomicUsize, + segment: AtomicU32, + chunk: AtomicU32, + len: AtomicUsize, +} + +impl ChunkHeaderType { + pub fn set_data_descriptor(&self, descriptor: &ChunkDescriptor) { + self.segment.store(descriptor.segment, Ordering::Relaxed); + self.chunk.store(descriptor.chunk, Ordering::Relaxed); + self.len.store(descriptor.len.into(), Ordering::Relaxed); + } + + pub fn data_descriptor(&self) -> ChunkDescriptor { + ChunkDescriptor::new( + self.segment.load(Ordering::Relaxed), + self.chunk.load(Ordering::Relaxed), + // SAFETY: this is safe because Write access to self.len is available only from set_data_descriptor + unsafe { NonZeroUsize::new_unchecked(self.len.load(Ordering::Relaxed)) }, + ) + } } diff --git a/commons/zenoh-shm/src/reader.rs b/commons/zenoh-shm/src/reader.rs index 675177907..f65a12fc3 100644 --- a/commons/zenoh-shm/src/reader.rs +++ b/commons/zenoh-shm/src/reader.rs @@ -22,8 +22,8 @@ use crate::{ client::shm_segment::ShmSegment, client_storage::ShmClientStorage, common::types::{ProtocolID, SegmentID}, + provider::chunk::ChunkDescriptor, }, - header::chunk_header::ChunkHeaderType, metadata::subscription::GLOBAL_METADATA_SUBSCRIPTION, watchdog::confirmator::GLOBAL_CONFIRMATOR, ShmBufInfo, ShmBufInner, @@ -55,14 +55,18 @@ impl ShmReader { // attach to the watchdog before doing other things let confirmed_metadata = Arc::new(GLOBAL_CONFIRMATOR.read().add(metadata)); - let segment = self.ensure_data_segment(confirmed_metadata.owned.header())?; - let buf = segment.map( + // retrieve data descriptor from metadata + let data_descriptor = confirmed_metadata.owned.header().data_descriptor(); + + let segment = self.ensure_data_segment( confirmed_metadata .owned .header() - .chunk + .protocol .load(std::sync::atomic::Ordering::Relaxed), + &data_descriptor, )?; + let buf = segment.map(data_descriptor.chunk)?; let shmb = ShmBufInner { metadata: confirmed_metadata, buf, @@ -76,11 +80,12 @@ impl ShmReader { } } - fn ensure_data_segment(&self, header: &ChunkHeaderType) -> ZResult> { - let id = GlobalDataSegmentID::new( - header.protocol.load(std::sync::atomic::Ordering::Relaxed), - header.segment.load(std::sync::atomic::Ordering::Relaxed), - ); + fn ensure_data_segment( + &self, + protocol_id: ProtocolID, + descriptor: &ChunkDescriptor, + ) -> ZResult> { + let id = GlobalDataSegmentID::new(protocol_id, descriptor.segment); // fastest path: try to get access to already mounted SHM segment // read lock allows concurrent execution of multiple requests @@ -110,8 +115,7 @@ impl ShmReader { // (common case) mount a new segment and add it to the map std::collections::hash_map::Entry::Vacant(vacant) => { - let new_segment = - client.attach(header.segment.load(std::sync::atomic::Ordering::Relaxed))?; + let new_segment = client.attach(descriptor.segment)?; Ok(vacant.insert(new_segment).clone()) } }