From adb7e28ba5be746eeb3b4d30329b30fb287ecb84 Mon Sep 17 00:00:00 2001 From: Jeron Aldaron Lau Date: Sun, 30 Oct 2022 15:07:18 -0500 Subject: [PATCH 1/2] Add lockless WakeList --- examples/mpmc.rs | 29 ++++++ src/lib.rs | 4 + src/list.rs | 228 +++++++++++++++++++++++++++++++++++++++++++++++ src/tcms.rs | 79 ++++++++++++++++ src/wake.rs | 99 ++++++++++++++++++++ 5 files changed, 439 insertions(+) create mode 100644 examples/mpmc.rs create mode 100644 src/list.rs create mode 100644 src/tcms.rs create mode 100644 src/wake.rs diff --git a/examples/mpmc.rs b/examples/mpmc.rs new file mode 100644 index 0000000..2966966 --- /dev/null +++ b/examples/mpmc.rs @@ -0,0 +1,29 @@ +use whisk::{Channel, Stream}; + +fn main() { + let executor = pasts::Executor::default(); + let channel = Stream::from(Channel::new()); + for _ in 0..24 { + let channel = channel.clone(); + std::thread::spawn(|| { + pasts::Executor::default().spawn(async move { + println!("Sending..."); + channel.send(Some(1)).await; + let count = Stream::strong_count(&channel); + println!("Sent {count}"); + if count <= 2 { + channel.send(None).await; + } + }) + }); + } + executor.spawn(async move { + let mut c = 0; + while let Some(v) = channel.recv().await { + println!("Received one."); + c += v; + } + println!("Received all."); + assert_eq!(c, 24); + }); +} diff --git a/src/lib.rs b/src/lib.rs index e4a3fe3..8ee1bdd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,10 @@ extern crate alloc; +mod list; +mod tcms; +mod wake; + use alloc::{ sync::{Arc, Weak}, vec::Vec, diff --git a/src/list.rs b/src/list.rs new file mode 100644 index 0000000..eb4c4f7 --- /dev/null +++ b/src/list.rs @@ -0,0 +1,228 @@ +//! List implementations + +#![allow(unsafe_code)] + +use alloc::{boxed::Box, vec::Vec}; +use core::mem::MaybeUninit; + +/// Dynamic list (grow or fixed) +pub(crate) enum DynList { + Grow(GrowList), + Fixed(FixedList), +} + +impl DynList { + pub(crate) fn merge( + self: &mut Box, + mut other: Box>, + ) { + use DynList::*; + match **self { + Grow(ref mut grow) => { + match *other { + Grow(ref mut l) => grow.0.extend(l.0.drain(..)), + Fixed(ref mut l) => { + grow.0.extend( + l.data[..l.size] + .iter() + .map(|x| unsafe { x.assume_init_read() }), + ); + // Don't drop items from other list + core::mem::forget(other); + } + } + } + Fixed(ref mut list) => { + match *other { + Grow(mut l) => { + l.0.splice( + ..0, + list.data[..list.size] + .iter() + .map(|x| unsafe { x.assume_init_read() }), + ); + let mut new = Grow(l); + core::mem::swap(&mut **self, &mut new); + // Don't drop items from this list + core::mem::forget(new); + } + Fixed(ref mut l) => { + if l.len() + list.len() > CAP { + let mut vec = Vec::new(); + vec.extend( + list.data[..list.size] + .iter() + .map(|x| unsafe { x.assume_init_read() }), + ); + vec.extend( + l.data[..l.size] + .iter() + .map(|x| unsafe { x.assume_init_read() }), + ); + let mut new = Grow(GrowList(vec)); + core::mem::swap(&mut **self, &mut new); + // Don't drop items from this list + core::mem::forget(new); + } else { + for item in l.data[..l.size].iter() { + self.push(unsafe { item.assume_init_read() }); + } + } + // Don't drop items from other list + core::mem::forget(other); + } + } + } + } + } +} + +impl Default for DynList { + fn default() -> Self { + Self::Fixed(FixedList::default()) + } +} + +pub(crate) struct GrowList(Vec); + +impl Default for GrowList { + fn default() -> Self { + Self(Vec::default()) + } +} + +pub(crate) struct FixedList { + size: usize, + data: [MaybeUninit; CAP], +} + +impl Default for FixedList { + fn default() -> Self { + let size = 0; + let data = uninit_array::(); + + Self { size, data } + } +} + +pub(crate) trait List { + fn push(&mut self, item: T); + fn pop(&mut self) -> Option; + fn len(&self) -> usize; + // fn get(&mut self, index: usize) -> &mut T; +} + +impl List for DynList { + fn push(&mut self, item: T) { + match self { + DynList::Grow(ref mut list) => list.push(item), + DynList::Fixed(ref mut list) => { + if list.len() == CAP { + let mut vec = + Vec::from(unsafe { array_assume_init(&list.data) }); + vec.push(item); + *self = DynList::Grow(GrowList(vec)); + } else { + list.push(item); + } + } + } + } + + fn pop(&mut self) -> Option { + match self { + DynList::Grow(ref mut list) => list.pop(), + DynList::Fixed(ref mut list) => list.pop(), + } + } + + fn len(&self) -> usize { + match self { + DynList::Grow(ref list) => list.len(), + DynList::Fixed(ref list) => list.len(), + } + } + + /*fn get(&mut self, index: usize) -> &mut T { + match self { + DynList::Grow(ref mut list) => list.get(index), + DynList::Fixed(ref mut list) => list.get(index), + } + }*/ +} + +impl List for GrowList { + fn push(&mut self, item: T) { + self.0.push(item); + } + + fn pop(&mut self) -> Option { + self.0.pop() + } + + fn len(&self) -> usize { + self.0.len() + } + + /*fn get(&mut self, index: usize) -> &mut T { + self.0.get_mut(index).unwrap() + }*/ +} + +impl List for FixedList { + fn push(&mut self, item: T) { + assert_ne!(self.size, CAP); + self.data[self.size].write(item); + self.size += 1; + } + + fn pop(&mut self) -> Option { + if self.size == 0 { + None + } else { + self.size -= 1; + Some(unsafe { self.data[self.size].assume_init_read() }) + } + } + + fn len(&self) -> usize { + self.size + } + + /*fn get(&mut self, index: usize) -> &mut T { + assert!(index < self.size); + + unsafe { self.data[index].assume_init_mut() } + }*/ +} + +impl Drop for FixedList { + fn drop(&mut self) { + for item in self.data[..self.size].iter_mut() { + unsafe { item.assume_init_drop() } + } + } +} + +/// Can be removed once https://github.com/rust-lang/rust/issues/96097 resolves +#[must_use] +#[inline(always)] +const fn uninit_array() -> [MaybeUninit; N] { + // SAFETY: An uninitialized `[MaybeUninit<_>; LEN]` is valid. + unsafe { MaybeUninit::<[MaybeUninit; N]>::uninit().assume_init() } +} + +#[inline(always)] +unsafe fn array_assume_init( + array: &[MaybeUninit; N], +) -> [T; N] { + // SAFETY: + // * The caller guarantees that all elements of the array are initialized + // * `MaybeUninit` and T are guaranteed to have the same layout + // * `MaybeUninit` does not drop, so there are no double-frees + // And thus the conversion is safe + let array: *const _ = array; + let array: *const [T; N] = array.cast(); + + array.read() +} diff --git a/src/tcms.rs b/src/tcms.rs new file mode 100644 index 0000000..4d54f7b --- /dev/null +++ b/src/tcms.rs @@ -0,0 +1,79 @@ +//! Lockless synchronization + +#![allow(unsafe_code)] + +use alloc::boxed::Box; +use core::{ + ptr, + sync::atomic::{AtomicPtr, Ordering}, +}; + +/// Take-create-merge-swap +/// +/// Essentially the opposite of RCU (read-copy-update), optimized for writing +/// rather than reading. +/// +/// - Task 1 takes the inner value +/// - Task 2 sees Task 1 has ownership of value +/// - Task 2 creates a new empty/default value +/// - Task 2 writes to new value +/// - Task 1 returns value +/// - Task 2 checks if value has been returned and swaps if not +/// - Task 2 takes ownership of other value if returned and merges then returns +/// +/// One thing to keep in mind when using this type is that not all values will +/// be available at all times. +pub(crate) struct Tcms(AtomicPtr); + +impl Tcms { + /// Create new TCMS + pub(crate) fn new() -> Self { + Self(AtomicPtr::new(Box::into_raw(Box::new(T::default())))) + } + + /// Run `f` with merger `m`. + /// + /// Merger is unstable, can't expect order to be preserved + pub(crate) fn with( + &self, + f: impl FnOnce(&mut T) -> R, + m: impl Fn(&mut Box, Box), + ) -> R { + // Swap with null pointer + let list = self.0.swap(ptr::null_mut(), Ordering::Acquire); + let mut list = if list.is_null() { + Box::new(T::default()) + } else { + unsafe { Box::from_raw(list) } + }; + + // Run closure with list + let r = f(&mut *list); + + // Merge lists if needed + let mut new = Box::into_raw(list); + while self + .0 + .compare_exchange( + core::ptr::null_mut(), + new, + Ordering::Release, + Ordering::Relaxed, + ) + .is_err() + { + let other = self.0.swap(ptr::null_mut(), Ordering::Acquire); + if !other.is_null() { + let mut a = unsafe { Box::from_raw(new) }; + let b = unsafe { Box::from_raw(other) }; + m(&mut a, b); + new = Box::into_raw(a); + } else { + // Too much contention with other task, try again + core::hint::spin_loop(); + }; + } + + r + } +} diff --git a/src/wake.rs b/src/wake.rs new file mode 100644 index 0000000..a959a9e --- /dev/null +++ b/src/wake.rs @@ -0,0 +1,99 @@ +//! WakeList implementation + +use alloc::boxed::Box; +use core::{ + sync::atomic::{AtomicUsize, Ordering}, + task::Waker, +}; + +use crate::list::{DynList, List}; +use crate::tcms::Tcms; + +pub(crate) struct RecvHandle(usize); + +pub(crate) struct SendHandle(usize); + +/// Lockless MPMC multi-waker +pub(crate) struct WakeList { + /// List of wakers, with up to 2 on the stack before falling back to heap + send: Tcms), 2>>, + /// List of wakers, with up to 2 on the stack before falling back to heap + recv: Tcms), 2>>, + /// List of garbage, with up to 2 on the stack before falling back to heap + garbage: Tcms>, + /// Next slot + slot: AtomicUsize, +} + +impl WakeList { + /// Create a new lockless multi-waker + pub(crate) fn new(&self) -> Self { + let send = Tcms::new(); + let recv = Tcms::new(); + let garbage = Tcms::new(); + let slot = AtomicUsize::new(0); + + Self { send, recv, garbage, slot } + } + + /// Allocate a new slot in the consumer waker list + fn alloc_recv(&self) -> RecvHandle { + let id = self.garbage.with(|g| g.pop(), merge).unwrap_or_else(|| self.slot.fetch_add(1, Ordering::Relaxed)); + self.recv.with(|list| list.push((id, None)), merge); + RecvHandle(id) + } + + /// Allocate a new slot in the producer waker list + fn alloc_send(&self) -> SendHandle { + let id = self.garbage.with(|g| g.pop(), merge).unwrap_or_else(|| self.slot.fetch_add(1, Ordering::Relaxed)); + self.send.with(|list| list.push((id, None)), merge); + SendHandle(id) + } + + /* + /// Overwrite stored waker for a producer handle + fn when_send(&self, handle: SendHandle, waker: Waker) { + self.send.with(|list| *list.get(handle.0) = Some(waker), merge); + } + + /// Overwrite stored waker for a producer handle + fn when_recv(&self, handle: RecvHandle, waker: Waker) { + self.recv.with(|list| *list.get(handle.0) = Some(waker), merge); + } + */ + + + /// Free a handle to be reused + fn free_send(&self, _handle: SendHandle) { + todo!() + } + + /// Free a handle to be reused + fn free_recv(&self, _handle: RecvHandle) { + todo!() + } + + /// Wake one waker + fn begin_wake_one_send(&self) { + todo!() + } + + /// Wake all wakers + fn begin_wake_all_send(&self) { + todo!() + } + + /// Wake one waker + fn begin_wake_one_recv(&self) { + todo!() + } + + /// Wake all wakers + fn begin_wake_all_recv(&self) { + todo!() + } +} + +fn merge(orig: &mut Box>, other: Box>) { + orig.merge(other) +} From cbe0aceffbecc4cd7a3084f8ae35487a0f4e027d Mon Sep 17 00:00:00 2001 From: Jeron Aldaron Lau Date: Sun, 30 Oct 2022 15:32:20 -0500 Subject: [PATCH 2/2] Get list slices --- src/list.rs | 35 ++++++++++++++++++------------ src/wake.rs | 62 ++++++++++++++++++++++++++--------------------------- 2 files changed, 52 insertions(+), 45 deletions(-) diff --git a/src/list.rs b/src/list.rs index eb4c4f7..4ad12ea 100644 --- a/src/list.rs +++ b/src/list.rs @@ -20,7 +20,7 @@ impl DynList { match **self { Grow(ref mut grow) => { match *other { - Grow(ref mut l) => grow.0.extend(l.0.drain(..)), + Grow(ref mut l) => grow.0.append(&mut l.0), Fixed(ref mut l) => { grow.0.extend( l.data[..l.size] @@ -109,7 +109,7 @@ pub(crate) trait List { fn push(&mut self, item: T); fn pop(&mut self) -> Option; fn len(&self) -> usize; - // fn get(&mut self, index: usize) -> &mut T; + fn as_slice(&mut self) -> &mut [T]; } impl List for DynList { @@ -143,12 +143,12 @@ impl List for DynList { } } - /*fn get(&mut self, index: usize) -> &mut T { + fn as_slice(&mut self) -> &mut [T] { match self { - DynList::Grow(ref mut list) => list.get(index), - DynList::Fixed(ref mut list) => list.get(index), + DynList::Grow(ref mut list) => list.as_slice(), + DynList::Fixed(ref mut list) => list.as_slice(), } - }*/ + } } impl List for GrowList { @@ -164,9 +164,9 @@ impl List for GrowList { self.0.len() } - /*fn get(&mut self, index: usize) -> &mut T { - self.0.get_mut(index).unwrap() - }*/ + fn as_slice(&mut self) -> &mut [T] { + self.0.as_mut_slice() + } } impl List for FixedList { @@ -189,11 +189,9 @@ impl List for FixedList { self.size } - /*fn get(&mut self, index: usize) -> &mut T { - assert!(index < self.size); - - unsafe { self.data[index].assume_init_mut() } - }*/ + fn as_slice(&mut self) -> &mut [T] { + unsafe { slice_assume_init_mut(&mut self.data[..self.size]) } + } } impl Drop for FixedList { @@ -226,3 +224,12 @@ unsafe fn array_assume_init( array.read() } + +#[inline(always)] +unsafe fn slice_assume_init_mut(slice: &mut [MaybeUninit]) -> &mut [T] { + // SAFETY: similar to safety notes for `slice_get_ref`, but we have a + // mutable reference which is also guaranteed to be valid for writes. + let slice: *mut _ = slice; + + &mut *(slice as *mut [T]) +} diff --git a/src/wake.rs b/src/wake.rs index a959a9e..1973a4f 100644 --- a/src/wake.rs +++ b/src/wake.rs @@ -6,8 +6,10 @@ use core::{ task::Waker, }; -use crate::list::{DynList, List}; -use crate::tcms::Tcms; +use crate::{ + list::{DynList, List}, + tcms::Tcms, +}; pub(crate) struct RecvHandle(usize); @@ -27,49 +29,47 @@ pub(crate) struct WakeList { impl WakeList { /// Create a new lockless multi-waker - pub(crate) fn new(&self) -> Self { + pub(crate) fn new() -> Self { let send = Tcms::new(); let recv = Tcms::new(); let garbage = Tcms::new(); let slot = AtomicUsize::new(0); - Self { send, recv, garbage, slot } + Self { + send, + recv, + garbage, + slot, + } } - /// Allocate a new slot in the consumer waker list - fn alloc_recv(&self) -> RecvHandle { - let id = self.garbage.with(|g| g.pop(), merge).unwrap_or_else(|| self.slot.fetch_add(1, Ordering::Relaxed)); - self.recv.with(|list| list.push((id, None)), merge); + /// Set waker + fn when_recv(&self, waker: Waker) -> RecvHandle { + let id = self + .garbage + .with(|g| g.pop(), merge) + .unwrap_or_else(|| self.slot.fetch_add(1, Ordering::Relaxed)); + self.recv.with(|list| list.push((id, Some(waker))), merge); RecvHandle(id) } - - /// Allocate a new slot in the producer waker list - fn alloc_send(&self) -> SendHandle { - let id = self.garbage.with(|g| g.pop(), merge).unwrap_or_else(|| self.slot.fetch_add(1, Ordering::Relaxed)); - self.send.with(|list| list.push((id, None)), merge); - SendHandle(id) - } - - /* - /// Overwrite stored waker for a producer handle - fn when_send(&self, handle: SendHandle, waker: Waker) { - self.send.with(|list| *list.get(handle.0) = Some(waker), merge); - } - /// Overwrite stored waker for a producer handle - fn when_recv(&self, handle: RecvHandle, waker: Waker) { - self.recv.with(|list| *list.get(handle.0) = Some(waker), merge); + /// Set waker + fn when_send(&self, waker: Waker) -> SendHandle { + let id = self + .garbage + .with(|g| g.pop(), merge) + .unwrap_or_else(|| self.slot.fetch_add(1, Ordering::Relaxed)); + self.send.with(|list| list.push((id, Some(waker))), merge); + SendHandle(id) } - */ - - /// Free a handle to be reused - fn free_send(&self, _handle: SendHandle) { + /// Free a send handle to be reused + fn begin_free_send(&self, _handle: SendHandle) { todo!() } - - /// Free a handle to be reused - fn free_recv(&self, _handle: RecvHandle) { + + /// Free a recv handle to be reused + fn begin_free_recv(&self, _handle: RecvHandle) { todo!() }