diff --git a/Cargo.toml b/Cargo.toml index d4f49cf..514a6c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ generic_once_cell = "0.1" interrupts = "0.1" interrupt-mutex = "0.1" lock_api = "0.4" +spinning_top = "0.3.0" [dev-dependencies] rand = "0.8" diff --git a/src/backoff.rs b/src/backoff.rs deleted file mode 100644 index 1eb19f7..0000000 --- a/src/backoff.rs +++ /dev/null @@ -1,35 +0,0 @@ -#[derive(Debug)] -pub struct Backoff { - step: u8, -} - -impl Backoff { - const YIELD_LIMIT: u8 = 10; - - #[inline] - pub fn new() -> Self { - Backoff { step: 0 } - } - - #[inline] - pub fn snooze(&mut self) { - for _ in 0..1_u16 << self.step { - core::hint::spin_loop(); - } - - if !self.is_completed() { - self.step += 1; - } - } - - #[inline] - pub fn is_completed(&self) -> bool { - self.step > Self::YIELD_LIMIT - } -} - -impl Default for Backoff { - fn default() -> Backoff { - Backoff::new() - } -} diff --git a/src/lib.rs b/src/lib.rs index a2758c0..c19b155 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,9 +88,24 @@ #![cfg_attr(not(test), no_std)] #![warn(unsafe_op_in_unsafe_fn)] -mod backoff; pub(crate) mod mutex; -pub(crate) mod rwlock; +pub(crate) mod rwlock { + /// A simple spinning, read-preferring readers-writer lock with exponential backoff. + pub type RawRwSpinLock = spinning_top::RawRwSpinlock; + + /// A [`lock_api::RwLock`] based on [`RawRwSpinLock`]. + pub type RwSpinLock = lock_api::RwLock; + + /// A [`lock_api::RwLockReadGuard`] based on [`RawRwSpinLock`]. + pub type RwSpinLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, RawRwSpinLock, T>; + + /// A [`lock_api::RwLockUpgradableReadGuard`] based on [`RawRwSpinLock`]. + pub type RwSpinLockUpgradableReadGuard<'a, T> = + lock_api::RwLockUpgradableReadGuard<'a, RawRwSpinLock, T>; + + /// A [`lock_api::RwLockWriteGuard`] based on [`RawRwSpinLock`]. + pub type RwSpinLockWriteGuard<'a, T> = lock_api::RwLockWriteGuard<'a, RawRwSpinLock, T>; +} pub use exclusive_cell::{CallOnce, CallOnceError, ExclusiveCell}; pub use interrupt_mutex::{InterruptMutex, InterruptMutexGuard, RawInterruptMutex}; diff --git a/src/mutex/mod.rs b/src/mutex/mod.rs index c8cf0b4..2b7c91d 100644 --- a/src/mutex/mod.rs +++ b/src/mutex/mod.rs @@ -1,4 +1,13 @@ -pub(crate) mod spin; +pub(crate) mod spin { + /// A simple spinlock with exponential backoff. + pub type RawSpinMutex = spinning_top::RawSpinlock; + + /// A [`lock_api::Mutex`] based on [`RawSpinMutex`]. + pub type SpinMutex = lock_api::Mutex; + + /// A [`lock_api::MutexGuard`] based on [`RawSpinMutex`]. + pub type SpinMutexGuard<'a, T> = lock_api::MutexGuard<'a, RawSpinMutex, T>; +} pub(crate) mod ticket; use interrupt_mutex::RawInterruptMutex; diff --git a/src/mutex/spin.rs b/src/mutex/spin.rs deleted file mode 100644 index 01f8cb3..0000000 --- a/src/mutex/spin.rs +++ /dev/null @@ -1,218 +0,0 @@ -use core::sync::atomic::{AtomicBool, Ordering}; - -use lock_api::{GuardSend, RawMutex}; - -use crate::backoff::Backoff; - -/// A simple [test and test-and-set] [spinlock] with [exponential backoff]. -/// -/// [test and test-and-set]: https://en.wikipedia.org/wiki/Test_and_test-and-set -/// [spinlock]: https://en.wikipedia.org/wiki/Spinlock -/// [exponential backoff]: https://en.wikipedia.org/wiki/Exponential_backoff -// Based on `spin::mutex::SpinMutex`, but with backoff. -pub struct RawSpinMutex { - lock: AtomicBool, -} - -unsafe impl RawMutex for RawSpinMutex { - #[allow(clippy::declare_interior_mutable_const)] - const INIT: Self = Self { - lock: AtomicBool::new(false), - }; - - type GuardMarker = GuardSend; - - #[inline] - fn lock(&self) { - let mut backoff = Backoff::new(); - while self - .lock - .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_err() - { - while self.is_locked() { - backoff.snooze(); - } - } - } - - #[inline] - fn try_lock(&self) -> bool { - self.lock - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - } - - #[inline] - unsafe fn unlock(&self) { - self.lock.store(false, Ordering::Release); - } - - #[inline] - fn is_locked(&self) -> bool { - self.lock.load(Ordering::Relaxed) - } -} - -/// A [`lock_api::Mutex`] based on [`RawSpinMutex`]. -pub type SpinMutex = lock_api::Mutex; - -/// A [`lock_api::MutexGuard`] based on [`RawSpinMutex`]. -pub type SpinMutexGuard<'a, T> = lock_api::MutexGuard<'a, RawSpinMutex, T>; - -// From `spin::mutex::spin` -#[cfg(test)] -mod tests { - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::mpsc::channel; - use std::sync::Arc; - use std::{mem, thread}; - - use super::*; - - #[test] - fn smoke() { - let m = SpinMutex::<_>::new(()); - drop(m.lock()); - drop(m.lock()); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn lots_and_lots() { - static M: SpinMutex<()> = SpinMutex::<_>::new(()); - static mut CNT: u32 = 0; - const J: u32 = 1000; - const K: u32 = 3; - - fn inc() { - for _ in 0..J { - unsafe { - let _g = M.lock(); - CNT += 1; - } - } - } - - let (tx, rx) = channel(); - for _ in 0..K { - let tx2 = tx.clone(); - thread::spawn(move || { - inc(); - tx2.send(()).unwrap(); - }); - let tx2 = tx.clone(); - thread::spawn(move || { - inc(); - tx2.send(()).unwrap(); - }); - } - - drop(tx); - for _ in 0..2 * K { - rx.recv().unwrap(); - } - assert_eq!(unsafe { CNT }, J * K * 2); - } - - #[test] - fn try_lock() { - let mutex = SpinMutex::<_>::new(42); - - // First lock succeeds - let a = mutex.try_lock(); - assert_eq!(a.as_ref().map(|r| **r), Some(42)); - - // Additional lock failes - let b = mutex.try_lock(); - assert!(b.is_none()); - - // After dropping lock, it succeeds again - ::core::mem::drop(a); - let c = mutex.try_lock(); - assert_eq!(c.as_ref().map(|r| **r), Some(42)); - } - - #[test] - fn test_into_inner() { - let m = SpinMutex::<_>::new(Box::new(10)); - assert_eq!(m.into_inner(), Box::new(10)); - } - - #[test] - fn test_into_inner_drop() { - struct Foo(Arc); - impl Drop for Foo { - fn drop(&mut self) { - self.0.fetch_add(1, Ordering::SeqCst); - } - } - let num_drops = Arc::new(AtomicUsize::new(0)); - let m = SpinMutex::<_>::new(Foo(num_drops.clone())); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - { - let _inner = m.into_inner(); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - } - assert_eq!(num_drops.load(Ordering::SeqCst), 1); - } - - #[test] - fn test_mutex_arc_nested() { - // Tests nested mutexes and access - // to underlying data. - let arc = Arc::new(SpinMutex::<_>::new(1)); - let arc2 = Arc::new(SpinMutex::<_>::new(arc)); - let (tx, rx) = channel(); - let _t = thread::spawn(move || { - let lock = arc2.lock(); - let lock2 = lock.lock(); - assert_eq!(*lock2, 1); - tx.send(()).unwrap(); - }); - rx.recv().unwrap(); - } - - #[test] - fn test_mutex_arc_access_in_unwind() { - let arc = Arc::new(SpinMutex::<_>::new(1)); - let arc2 = arc.clone(); - let _ = thread::spawn(move || -> () { - struct Unwinder { - i: Arc>, - } - impl Drop for Unwinder { - fn drop(&mut self) { - *self.i.lock() += 1; - } - } - let _u = Unwinder { i: arc2 }; - panic!(); - }) - .join(); - let lock = arc.lock(); - assert_eq!(*lock, 2); - } - - #[test] - fn test_mutex_unsized() { - let mutex: &SpinMutex<[i32]> = &SpinMutex::<_>::new([1, 2, 3]); - { - let b = &mut *mutex.lock(); - b[0] = 4; - b[2] = 5; - } - let comp: &[i32] = &[4, 2, 5]; - assert_eq!(&*mutex.lock(), comp); - } - - #[test] - fn test_mutex_force_lock() { - let lock = SpinMutex::<_>::new(()); - mem::forget(lock.lock()); - unsafe { - lock.force_unlock(); - } - assert!(lock.try_lock().is_some()); - } -} diff --git a/src/mutex/ticket.rs b/src/mutex/ticket.rs index 658610e..4203033 100644 --- a/src/mutex/ticket.rs +++ b/src/mutex/ticket.rs @@ -2,7 +2,7 @@ use core::sync::atomic::{AtomicUsize, Ordering}; use lock_api::{GuardSend, RawMutex, RawMutexFair}; -use crate::backoff::Backoff; +use spinning_top::relax::{Backoff, Relax}; /// A [fair] [ticket lock] with [exponential backoff]. /// @@ -28,9 +28,9 @@ unsafe impl RawMutex for RawTicketMutex { fn lock(&self) { let ticket = self.next_ticket.fetch_add(1, Ordering::Relaxed); - let mut backoff = Backoff::new(); + let mut backoff = Backoff::default(); while self.next_serving.load(Ordering::Acquire) != ticket { - backoff.snooze(); + backoff.relax(); } } diff --git a/src/rwlock.rs b/src/rwlock.rs deleted file mode 100644 index 6e2f529..0000000 --- a/src/rwlock.rs +++ /dev/null @@ -1,456 +0,0 @@ -use core::sync::atomic::{AtomicUsize, Ordering}; - -use lock_api::{ - GuardSend, RawRwLock, RawRwLockDowngrade, RawRwLockRecursive, RawRwLockUpgrade, - RawRwLockUpgradeDowngrade, -}; - -use crate::backoff::Backoff; - -/// A simple spinning, read-preferring [readers-writer lock] with [exponential backoff]. -/// -/// [readers-writer lock]: https://en.wikipedia.org/wiki/Readers-writer_lock -/// [exponential backoff]: https://en.wikipedia.org/wiki/Exponential_backoff -// Based on `spin::rwlock::RwLock`, but with backoff and separation of UPGRADABLE and EXCLUSIVE. -pub struct RawRwSpinLock { - lock: AtomicUsize, -} - -/// Normal shared lock counter -const SHARED: usize = 1 << 2; -/// Special upgradable shared lock flag -const UPGRADABLE: usize = 1 << 1; -/// Exclusive lock flag -const EXCLUSIVE: usize = 1; - -impl RawRwSpinLock { - #[inline] - fn is_locked_shared(&self) -> bool { - self.lock.load(Ordering::Relaxed) & !(EXCLUSIVE | UPGRADABLE) != 0 - } - - #[inline] - fn is_locked_upgradable(&self) -> bool { - self.lock.load(Ordering::Relaxed) & UPGRADABLE == UPGRADABLE - } - - /// Acquire a shared lock, returning the new lock value. - #[inline] - fn acquire_shared(&self) -> usize { - let value = self.lock.fetch_add(SHARED, Ordering::Acquire); - - // An arbitrary cap that allows us to catch overflows long before they happen - if value > usize::MAX / 2 { - self.lock.fetch_sub(SHARED, Ordering::Relaxed); - panic!("Too many shared locks, cannot safely proceed"); - } - - value - } -} - -unsafe impl RawRwLock for RawRwSpinLock { - #[allow(clippy::declare_interior_mutable_const)] - const INIT: Self = Self { - lock: AtomicUsize::new(0), - }; - - type GuardMarker = GuardSend; - - #[inline] - fn lock_shared(&self) { - let mut backoff = Backoff::new(); - while !self.try_lock_shared() { - backoff.snooze(); - } - } - - #[inline] - fn try_lock_shared(&self) -> bool { - let value = self.acquire_shared(); - - let acquired = value & EXCLUSIVE != EXCLUSIVE; - - if !acquired { - unsafe { - self.unlock_shared(); - } - } - - acquired - } - - #[inline] - unsafe fn unlock_shared(&self) { - debug_assert!(self.is_locked_shared()); - - self.lock.fetch_sub(SHARED, Ordering::Release); - } - - #[inline] - fn lock_exclusive(&self) { - let mut backoff = Backoff::new(); - while self - .lock - .compare_exchange_weak(0, EXCLUSIVE, Ordering::Acquire, Ordering::Relaxed) - .is_err() - { - backoff.snooze(); - } - } - - #[inline] - fn try_lock_exclusive(&self) -> bool { - self.lock - .compare_exchange(0, EXCLUSIVE, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - } - - #[inline] - unsafe fn unlock_exclusive(&self) { - debug_assert!(self.is_locked_exclusive()); - - self.lock.fetch_and(!EXCLUSIVE, Ordering::Release); - } - - #[inline] - fn is_locked(&self) -> bool { - self.lock.load(Ordering::Relaxed) != 0 - } - - #[inline] - fn is_locked_exclusive(&self) -> bool { - self.lock.load(Ordering::Relaxed) & EXCLUSIVE == EXCLUSIVE - } -} - -unsafe impl RawRwLockRecursive for RawRwSpinLock { - #[inline] - fn lock_shared_recursive(&self) { - self.lock_shared(); - } - - #[inline] - fn try_lock_shared_recursive(&self) -> bool { - self.try_lock_shared() - } -} - -unsafe impl RawRwLockDowngrade for RawRwSpinLock { - #[inline] - unsafe fn downgrade(&self) { - // Reserve the shared guard for ourselves - self.acquire_shared(); - - unsafe { - self.unlock_exclusive(); - } - } -} - -unsafe impl RawRwLockUpgrade for RawRwSpinLock { - #[inline] - fn lock_upgradable(&self) { - let mut backoff = Backoff::new(); - while !self.try_lock_upgradable() { - backoff.snooze(); - } - } - - #[inline] - fn try_lock_upgradable(&self) -> bool { - let value = self.lock.fetch_or(UPGRADABLE, Ordering::Acquire); - - let acquired = value & (UPGRADABLE | EXCLUSIVE) == 0; - - if !acquired && value & UPGRADABLE == 0 { - unsafe { - self.unlock_upgradable(); - } - } - - acquired - } - - #[inline] - unsafe fn unlock_upgradable(&self) { - debug_assert!(self.is_locked_upgradable()); - - self.lock.fetch_and(!UPGRADABLE, Ordering::Release); - } - - #[inline] - unsafe fn upgrade(&self) { - let mut backoff = Backoff::new(); - while self - .lock - .compare_exchange_weak(UPGRADABLE, EXCLUSIVE, Ordering::Acquire, Ordering::Relaxed) - .is_err() - { - backoff.snooze(); - } - } - - #[inline] - unsafe fn try_upgrade(&self) -> bool { - self.lock - .compare_exchange(UPGRADABLE, EXCLUSIVE, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - } -} - -unsafe impl RawRwLockUpgradeDowngrade for RawRwSpinLock { - #[inline] - unsafe fn downgrade_upgradable(&self) { - self.acquire_shared(); - - unsafe { - self.unlock_upgradable(); - } - } - - #[inline] - unsafe fn downgrade_to_upgradable(&self) { - debug_assert!(self.is_locked_exclusive()); - - self.lock - .fetch_xor(UPGRADABLE | EXCLUSIVE, Ordering::Release); - } -} - -/// A [`lock_api::RwLock`] based on [`RawRwSpinLock`]. -pub type RwSpinLock = lock_api::RwLock; - -/// A [`lock_api::RwLockReadGuard`] based on [`RawRwSpinLock`]. -pub type RwSpinLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, RawRwSpinLock, T>; - -/// A [`lock_api::RwLockUpgradableReadGuard`] based on [`RawRwSpinLock`]. -pub type RwSpinLockUpgradableReadGuard<'a, T> = - lock_api::RwLockUpgradableReadGuard<'a, RawRwSpinLock, T>; - -/// A [`lock_api::RwLockWriteGuard`] based on [`RawRwSpinLock`]. -pub type RwSpinLockWriteGuard<'a, T> = lock_api::RwLockWriteGuard<'a, RawRwSpinLock, T>; - -// Adapted from `spin::rwlock` -#[cfg(test)] -mod tests { - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::mpsc::channel; - use std::sync::Arc; - use std::{mem, thread}; - - use lock_api::{RwLockUpgradableReadGuard, RwLockWriteGuard}; - - use super::*; - - #[test] - fn test_unlock_shared() { - let m = RawRwSpinLock::INIT; - m.lock_shared(); - m.lock_shared(); - m.lock_shared(); - assert!(!m.try_lock_exclusive()); - unsafe { - m.unlock_shared(); - m.unlock_shared(); - } - assert!(!m.try_lock_exclusive()); - unsafe { - m.unlock_shared(); - } - assert!(m.try_lock_exclusive()); - } - - #[test] - fn test_unlock_exclusive() { - let m = RawRwSpinLock::INIT; - m.lock_exclusive(); - assert!(!m.try_lock_shared()); - unsafe { - m.unlock_exclusive(); - } - assert!(m.try_lock_shared()); - } - - #[test] - fn smoke() { - let l = RwSpinLock::new(()); - drop(l.read()); - drop(l.write()); - drop((l.read(), l.read())); - drop(l.write()); - } - - #[test] - fn frob() { - use rand::Rng; - - static R: RwSpinLock = RwSpinLock::new(0); - const N: usize = 10; - const M: usize = 1000; - - let (tx, rx) = channel::<()>(); - for _ in 0..N { - let tx = tx.clone(); - thread::spawn(move || { - let mut rng = rand::thread_rng(); - for _ in 0..M { - if rng.gen_bool(1.0 / N as f64) { - drop(R.write()); - } else { - drop(R.read()); - } - } - drop(tx); - }); - } - drop(tx); - let _ = rx.recv(); - } - - #[test] - fn test_rw_arc() { - let arc = Arc::new(RwSpinLock::new(0)); - let arc2 = arc.clone(); - let (tx, rx) = channel(); - - thread::spawn(move || { - let mut lock = arc2.write(); - for _ in 0..10 { - let tmp = *lock; - *lock = -1; - thread::yield_now(); - *lock = tmp + 1; - } - tx.send(()).unwrap(); - }); - - // Readers try to catch the writer in the act - let mut children = Vec::new(); - for _ in 0..5 { - let arc3 = arc.clone(); - children.push(thread::spawn(move || { - let lock = arc3.read(); - assert!(*lock >= 0); - })); - } - - // Wait for children to pass their asserts - for r in children { - assert!(r.join().is_ok()); - } - - // Wait for writer to finish - rx.recv().unwrap(); - let lock = arc.read(); - assert_eq!(*lock, 10); - } - - #[test] - fn test_rw_access_in_unwind() { - let arc = Arc::new(RwSpinLock::new(1)); - let arc2 = arc.clone(); - let _ = thread::spawn(move || -> () { - struct Unwinder { - i: Arc>, - } - impl Drop for Unwinder { - fn drop(&mut self) { - let mut lock = self.i.write(); - *lock += 1; - } - } - let _u = Unwinder { i: arc2 }; - panic!(); - }) - .join(); - let lock = arc.read(); - assert_eq!(*lock, 2); - } - - #[test] - fn test_rwlock_unsized() { - let rw: &RwSpinLock<[i32]> = &RwSpinLock::new([1, 2, 3]); - { - let b = &mut *rw.write(); - b[0] = 4; - b[2] = 5; - } - let comp: &[i32] = &[4, 2, 5]; - assert_eq!(&*rw.read(), comp); - } - - #[test] - fn test_rwlock_try_write() { - let lock = RwSpinLock::new(0isize); - let read_guard = lock.read(); - - let write_result = lock.try_write(); - match write_result { - None => (), - Some(_) => assert!( - false, - "try_write should not succeed while read_guard is in scope" - ), - } - - drop(read_guard); - } - - #[test] - fn test_rw_try_read() { - let m = RwSpinLock::new(0); - mem::forget(m.write()); - assert!(m.try_read().is_none()); - } - - #[test] - fn test_into_inner() { - let m = RwSpinLock::new(Box::new(10)); - assert_eq!(m.into_inner(), Box::new(10)); - } - - #[test] - fn test_into_inner_drop() { - struct Foo(Arc); - impl Drop for Foo { - fn drop(&mut self) { - self.0.fetch_add(1, Ordering::SeqCst); - } - } - let num_drops = Arc::new(AtomicUsize::new(0)); - let m = RwSpinLock::new(Foo(num_drops.clone())); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - { - let _inner = m.into_inner(); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - } - assert_eq!(num_drops.load(Ordering::SeqCst), 1); - } - - #[test] - fn test_upgrade_downgrade() { - let m = RwSpinLock::new(()); - { - let _r = m.read(); - let upg = m.try_upgradable_read().unwrap(); - assert!(m.try_read().is_some()); - assert!(m.try_write().is_none()); - assert!(RwLockUpgradableReadGuard::try_upgrade(upg).is_err()); - } - { - let w = m.write(); - assert!(m.try_upgradable_read().is_none()); - let _r = RwLockWriteGuard::downgrade(w); - assert!(m.try_upgradable_read().is_some()); - assert!(m.try_read().is_some()); - assert!(m.try_write().is_none()); - } - { - let _u = m.upgradable_read(); - assert!(m.try_upgradable_read().is_none()); - } - - assert!(RwLockUpgradableReadGuard::try_upgrade(m.try_upgradable_read().unwrap()).is_ok()); - } -}