use std::cell::UnsafeCell; use std::sync::atomic::{AtomicU32, Ordering}; /// An unfair shared/exclusive lock. One may quickly describe this to be an /// unfair RwLock where a thread that wishes to write will get to write as fast /// as possible (i.e. waiting for readers to finish), but others writers in line /// may have to wait for another round of readers acquiring the lock. /// /// However, this is NOT a read/write lock. It is a shared/exclusive lock. It is /// used in concurrent datastructures (implemented with atomics), so particular /// kinds of writing may still occur by the threads holding a shared lock. In /// that case the programmer must make sure that these writes are coordinated /// in a thread-safe manner. /// /// It was designed with resizable (ring)buffers in mind: most often you have /// the standard atomic pointers/indices moving around in the ringbuffer. But /// when the buffer needs to be resized you need to be sure that no-one is /// reading/writing the wrong/old/deallocated buffer pointer. Hence the /// shared/exclusive terminology. /// /// For this reason the `UnfairSeLock` was written assuming that exclusive locks /// are only held sometime: shared locks are obtained most of the time. // Note: preliminary benchmark batches shows this is ~2x faster than a RwLock // when under some contention. pub struct UnfairSeLock { // Uses 31 bits to track number of shared locks, and the high bit is set if // an exclusive lock is supposed to be held. 31 bits is more than sufficient // because in this project shared locks will be held by individual threads. shared: AtomicU32, cell: UnsafeCell, } // Exclusive bit is set in the atomic value when a thread wishes to hold an // exclusive lock. const EXCLUSIVE_BIT: u32 = 1 << 31; impl UnfairSeLock { pub fn new(value: T) -> Self { return Self{ shared: AtomicU32::new(0), cell: UnsafeCell::new(value), } } /// Get shared access to the underlying data. #[must_use] pub fn lock_shared(&self) -> UnfairSeLockSharedGuard { let mut shared = self.shared.load(Ordering::Relaxed); loop { if shared & EXCLUSIVE_BIT != 0 { shared = self.wait_until_not_exclusive(shared); } // Spinlock until we've incremented. If we fail we need to check the // exclusive bit again. let new_shared = shared + 1; match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) { Ok(_) => return UnfairSeLockSharedGuard::new(self, new_shared), Err(actual_value) => { shared = actual_value; }, } } } /// Get exclusive access to the underlying data. #[must_use] pub fn lock_exclusive(&self) -> UnfairSeLockExclusiveGuard { let mut shared = self.shared.load(Ordering::Relaxed); loop { if shared & EXCLUSIVE_BIT != 0 { shared = self.wait_until_not_exclusive(shared); } // We want to set the write bit let new_shared = shared | EXCLUSIVE_BIT; match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) { Ok(_) => { // We've acquired the write lock, but we still might have // to wait until the reader count is at 0. shared = new_shared; if shared != EXCLUSIVE_BIT { shared = self.wait_until_not_shared(shared); } return UnfairSeLockExclusiveGuard::new(self); }, Err(actual_value) => { shared = actual_value; } } } } fn wait_until_not_exclusive(&self, mut shared: u32) -> u32 { // Assume this is only called when the EXCLUSIVE_BIT is set debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT); loop { // So spin until no longer held shared = self.shared.load(Ordering::Acquire); if shared & EXCLUSIVE_BIT == 0 { return shared; } } } #[inline] fn wait_until_not_shared(&self, mut shared: u32) -> u32 { // This is only called when someone has signaled the exclusive bit, but // there are still threads holding the shared lock. loop { debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT); if shared == EXCLUSIVE_BIT { // shared count is 0 return shared; } shared = self.shared.load(Ordering::Acquire); } } } /// A guard signifying that the owner has shared access to the underlying /// `UnfairSeLock`. pub struct UnfairSeLockSharedGuard<'a, T> { lock: &'a UnfairSeLock, initial_value: u32, } impl<'a, T> UnfairSeLockSharedGuard<'a, T> { fn new(lock: &'a UnfairSeLock, initial_value: u32) -> Self { return Self{ lock, initial_value }; } /// Force retrieval of the underlying type `T` in the mutable sense. Note /// that the caller is now responsible for ensuring that concurrent mutable /// access takes place in a correct fashion. #[inline] pub unsafe fn get_mut(&self) -> &mut T { return unsafe{ &mut *self.lock.cell.get() }; } } impl<'a, T> Drop for UnfairSeLockSharedGuard<'a, T> { fn drop(&mut self) { // Spinlock until we've decremented the number of shared locks. let mut value = self.initial_value; while let Err(actual_value) = self.lock.shared.compare_exchange_weak( value, value - 1, Ordering::AcqRel, Ordering::Acquire ) { value = actual_value; } } } impl<'a, T> std::ops::Deref for UnfairSeLockSharedGuard<'a, T> { type Target = T; fn deref(&self) -> &Self::Target { return unsafe{ &*self.lock.cell.get() }; } } /// A guard signifying that the owner has exclusive access to the underlying /// `UnfairSeLock`. pub struct UnfairSeLockExclusiveGuard<'a, T> { lock: &'a UnfairSeLock, } impl<'a, T> UnfairSeLockExclusiveGuard<'a, T> { fn new(lock: &'a UnfairSeLock) -> Self { return Self{ lock }; } } impl<'a, T> Drop for UnfairSeLockExclusiveGuard<'a, T> { fn drop(&mut self) { // We have the exclusive bit set, and this type was constructed when // the number of shared locks was at 0, we can safely store a `0` into // the atomic debug_assert_eq!(self.lock.shared.load(Ordering::Relaxed), EXCLUSIVE_BIT); // relaxed because we acquired it before self.lock.shared.store(0, Ordering::Release); } } impl<'a, T> std::ops::Deref for UnfairSeLockExclusiveGuard<'a, T> { type Target = T; fn deref(&self) -> &Self::Target { return unsafe{ &*self.lock.cell.get() }; } } impl<'a, T> std::ops::DerefMut for UnfairSeLockExclusiveGuard<'a, T> { fn deref_mut(&mut self) -> &mut Self::Target { return unsafe{ &mut *self.lock.cell.get() }; } }