Skip to content

Commit

Permalink
no need to explicitly store ChunkDescriptor in BusyChunk as now it is…
Browse files Browse the repository at this point in the history
… stored in shared metadata
  • Loading branch information
yellowhatter committed Jan 17, 2025
1 parent f2df6e8 commit 25f7b05
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
36 changes: 14 additions & 22 deletions commons/zenoh-shm/src/api/provider/shm_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ use crate::{

#[derive(Debug)]
struct BusyChunk {
descriptor: ChunkDescriptor,
metadata: AllocatedMetadataDescriptor,
}

impl BusyChunk {
fn new(descriptor: ChunkDescriptor, metadata: AllocatedMetadataDescriptor) -> Self {
Self {
descriptor,
metadata,
}
fn new(metadata: AllocatedMetadataDescriptor) -> Self {
Self { metadata }
}

fn descriptor(&self) -> ChunkDescriptor {
self.metadata.header().data_descriptor()
}
}

Expand Down Expand Up @@ -219,7 +219,7 @@ impl ForceDeallocPolicy for DeallocOptimal {
};
drop(guard);

provider.backend.free(&chunk_to_dealloc.descriptor);
provider.backend.free(&chunk_to_dealloc.descriptor());
true
}
}
Expand All @@ -233,7 +233,7 @@ impl ForceDeallocPolicy for DeallocYoungest {
) -> bool {
match provider.busy_list.lock().unwrap().pop_back() {
Some(val) => {
provider.backend.free(&val.descriptor);
provider.backend.free(&val.descriptor());
true
}
None => false,
Expand All @@ -250,7 +250,7 @@ impl ForceDeallocPolicy for DeallocEldest {
) -> bool {
match provider.busy_list.lock().unwrap().pop_front() {
Some(val) => {
provider.backend.free(&val.descriptor);
provider.backend.free(&val.descriptor());
true
}
None => false,
Expand Down Expand Up @@ -839,8 +839,9 @@ where
guard.retain(|maybe_free| {
if is_free_chunk(maybe_free) {
tracing::trace!("Garbage Collecting Chunk: {:?}", maybe_free);
self.backend.free(&maybe_free.descriptor);
largest = largest.max(maybe_free.descriptor.len.get());
let descriptor_to_free = maybe_free.descriptor();
self.backend.free(&descriptor_to_free);
largest = largest.max(descriptor_to_free.len.get());
return false;
}
true
Expand Down Expand Up @@ -912,16 +913,7 @@ where
// chunk descriptor
allocated_metadata
.header()
.segment
.store(chunk.descriptor.segment, Ordering::Relaxed);
allocated_metadata
.header()
.chunk
.store(chunk.descriptor.chunk, Ordering::Relaxed);
allocated_metadata
.header()
.len
.store(chunk.descriptor.len.into(), Ordering::Relaxed);
.set_data_descriptor(&chunk.descriptor);
// protocol
allocated_metadata
.header()
Expand Down Expand Up @@ -954,7 +946,7 @@ where
self.busy_list
.lock()
.unwrap()
.push_back(BusyChunk::new(chunk.descriptor, allocated_metadata));
.push_back(BusyChunk::new(allocated_metadata));

shmb
}
Expand Down
30 changes: 26 additions & 4 deletions commons/zenoh-shm/src/header/chunk_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize};
use std::{
num::NonZeroUsize,
sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
};

use crate::api::provider::chunk::ChunkDescriptor;

// Chunk header
#[stabby::stabby]
Expand All @@ -30,7 +35,24 @@ pub struct ChunkHeaderType {
pub protocol: AtomicU32,

/// The data chunk descriptor
pub segment: AtomicU32,
pub chunk: AtomicU32,
pub len: AtomicUsize,
segment: AtomicU32,
chunk: AtomicU32,
len: AtomicUsize,
}

impl ChunkHeaderType {
pub fn set_data_descriptor(&self, descriptor: &ChunkDescriptor) {
self.segment.store(descriptor.segment, Ordering::Relaxed);
self.chunk.store(descriptor.chunk, Ordering::Relaxed);
self.len.store(descriptor.len.into(), Ordering::Relaxed);
}

pub fn data_descriptor(&self) -> ChunkDescriptor {
ChunkDescriptor::new(
self.segment.load(Ordering::Relaxed),
self.chunk.load(Ordering::Relaxed),
// SAFETY: this is safe because Write access to self.len is available only from set_data_descriptor
unsafe { NonZeroUsize::new_unchecked(self.len.load(Ordering::Relaxed)) },
)
}
}
26 changes: 15 additions & 11 deletions commons/zenoh-shm/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::{
client::shm_segment::ShmSegment,
client_storage::ShmClientStorage,
common::types::{ProtocolID, SegmentID},
provider::chunk::ChunkDescriptor,
},
header::chunk_header::ChunkHeaderType,
metadata::subscription::GLOBAL_METADATA_SUBSCRIPTION,
watchdog::confirmator::GLOBAL_CONFIRMATOR,
ShmBufInfo, ShmBufInner,
Expand Down Expand Up @@ -55,14 +55,18 @@ impl ShmReader {
// attach to the watchdog before doing other things
let confirmed_metadata = Arc::new(GLOBAL_CONFIRMATOR.read().add(metadata));

let segment = self.ensure_data_segment(confirmed_metadata.owned.header())?;
let buf = segment.map(
// retrieve data descriptor from metadata
let data_descriptor = confirmed_metadata.owned.header().data_descriptor();

let segment = self.ensure_data_segment(
confirmed_metadata
.owned
.header()
.chunk
.protocol
.load(std::sync::atomic::Ordering::Relaxed),
&data_descriptor,
)?;
let buf = segment.map(data_descriptor.chunk)?;
let shmb = ShmBufInner {
metadata: confirmed_metadata,
buf,
Expand All @@ -76,11 +80,12 @@ impl ShmReader {
}
}

fn ensure_data_segment(&self, header: &ChunkHeaderType) -> ZResult<Arc<dyn ShmSegment>> {
let id = GlobalDataSegmentID::new(
header.protocol.load(std::sync::atomic::Ordering::Relaxed),
header.segment.load(std::sync::atomic::Ordering::Relaxed),
);
fn ensure_data_segment(
&self,
protocol_id: ProtocolID,
descriptor: &ChunkDescriptor,
) -> ZResult<Arc<dyn ShmSegment>> {
let id = GlobalDataSegmentID::new(protocol_id, descriptor.segment);

// fastest path: try to get access to already mounted SHM segment
// read lock allows concurrent execution of multiple requests
Expand Down Expand Up @@ -110,8 +115,7 @@ impl ShmReader {

// (common case) mount a new segment and add it to the map
std::collections::hash_map::Entry::Vacant(vacant) => {
let new_segment =
client.attach(header.segment.load(std::sync::atomic::Ordering::Relaxed))?;
let new_segment = client.attach(descriptor.segment)?;
Ok(vacant.insert(new_segment).clone())
}
}
Expand Down

0 comments on commit 25f7b05

Please sign in to comment.