Files
@ 9e771c9cf8d3
Branch filter:
Location: CSY/reowolf/src/runtime2/store/unfair_se_lock.rs
9e771c9cf8d3
7.0 KiB
application/rls-services+xml
WIP: Control messaging between components
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicU32, Ordering};
/// An unfair shared/exclusive lock. One may quickly describe this to be an
/// unfair RwLock where a thread that wishes to write will get to write as fast
/// as possible (i.e. waiting for readers to finish), but others writers in line
/// may have to wait for another round of readers acquiring the lock.
///
/// However, this is NOT a read/write lock. It is a shared/exclusive lock. It is
/// used in concurrent datastructures (implemented with atomics), so particular
/// kinds of writing may still occur by the threads holding a shared lock. In
/// that case the programmer must make sure that these writes are coordinated
/// in a thread-safe manner.
///
/// It was designed with resizable (ring)buffers in mind: most often you have
/// the standard atomic pointers/indices moving around in the ringbuffer. But
/// when the buffer needs to be resized you need to be sure that no-one is
/// reading/writing the wrong/old/deallocated buffer pointer. Hence the
/// shared/exclusive terminology.
///
/// For this reason the `UnfairSeLock` was written assuming that exclusive locks
/// are only held sometime: shared locks are obtained most of the time.
// Note: preliminary benchmark batches shows this is ~2x faster than a RwLock
// when under some contention.
pub struct UnfairSeLock<T> {
// Uses 31 bits to track number of shared locks, and the high bit is set if
// an exclusive lock is supposed to be held. 31 bits is more than sufficient
// because in this project shared locks will be held by individual threads.
shared: AtomicU32,
cell: UnsafeCell<T>,
}
// Exclusive bit is set in the atomic value when a thread wishes to hold an
// exclusive lock.
const EXCLUSIVE_BIT: u32 = 1 << 31;
impl<T> UnfairSeLock<T> {
pub fn new(value: T) -> Self {
return Self{
shared: AtomicU32::new(0),
cell: UnsafeCell::new(value),
}
}
/// Get shared access to the underlying data.
#[must_use]
pub fn lock_shared(&self) -> UnfairSeLockSharedGuard<T> {
let mut shared = self.shared.load(Ordering::Relaxed);
loop {
if shared & EXCLUSIVE_BIT != 0 {
shared = self.wait_until_not_exclusive(shared);
}
// Spinlock until we've incremented. If we fail we need to check the
// exclusive bit again.
let new_shared = shared + 1;
match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) {
Ok(_) => return UnfairSeLockSharedGuard::new(self, new_shared),
Err(actual_value) => { shared = actual_value; },
}
}
}
/// Get exclusive access to the underlying data.
#[must_use]
pub fn lock_exclusive(&self) -> UnfairSeLockExclusiveGuard<T> {
let mut shared = self.shared.load(Ordering::Relaxed);
loop {
if shared & EXCLUSIVE_BIT != 0 {
shared = self.wait_until_not_exclusive(shared);
}
// We want to set the write bit
let new_shared = shared | EXCLUSIVE_BIT;
match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) {
Ok(_) => {
// We've acquired the write lock, but we still might have
// to wait until the reader count is at 0.
shared = new_shared;
if shared != EXCLUSIVE_BIT {
shared = self.wait_until_not_shared(shared);
}
return UnfairSeLockExclusiveGuard::new(self);
},
Err(actual_value) => { shared = actual_value; }
}
}
}
fn wait_until_not_exclusive(&self, mut shared: u32) -> u32 {
// Assume this is only called when the EXCLUSIVE_BIT is set
debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT);
loop {
// So spin until no longer held
shared = self.shared.load(Ordering::Acquire);
if shared & EXCLUSIVE_BIT == 0 {
return shared;
}
}
}
#[inline]
fn wait_until_not_shared(&self, mut shared: u32) -> u32 {
// This is only called when someone has signaled the exclusive bit, but
// there are still threads holding the shared lock.
loop {
debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT);
if shared == EXCLUSIVE_BIT {
// shared count is 0
return shared;
}
shared = self.shared.load(Ordering::Acquire);
}
}
}
/// A guard signifying that the owner has shared access to the underlying
/// `UnfairSeLock`.
pub struct UnfairSeLockSharedGuard<'a, T> {
lock: &'a UnfairSeLock<T>,
initial_value: u32,
}
impl<'a, T> UnfairSeLockSharedGuard<'a, T> {
fn new(lock: &'a UnfairSeLock<T>, initial_value: u32) -> Self {
return Self{ lock, initial_value };
}
/// Force retrieval of the underlying type `T` in the mutable sense. Note
/// that the caller is now responsible for ensuring that concurrent mutable
/// access takes place in a correct fashion.
#[inline]
pub unsafe fn get_mut(&self) -> &mut T {
return unsafe{ &mut *self.lock.cell.get() };
}
}
impl<'a, T> Drop for UnfairSeLockSharedGuard<'a, T> {
fn drop(&mut self) {
// Spinlock until we've decremented the number of shared locks.
let mut value = self.initial_value;
while let Err(actual_value) = self.lock.shared.compare_exchange_weak(
value, value - 1, Ordering::AcqRel, Ordering::Acquire
) {
value = actual_value;
}
}
}
impl<'a, T> std::ops::Deref for UnfairSeLockSharedGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
return unsafe{ &*self.lock.cell.get() };
}
}
/// A guard signifying that the owner has exclusive access to the underlying
/// `UnfairSeLock`.
pub struct UnfairSeLockExclusiveGuard<'a, T> {
lock: &'a UnfairSeLock<T>,
}
impl<'a, T> UnfairSeLockExclusiveGuard<'a, T> {
fn new(lock: &'a UnfairSeLock<T>) -> Self {
return Self{ lock };
}
}
impl<'a, T> Drop for UnfairSeLockExclusiveGuard<'a, T> {
fn drop(&mut self) {
// We have the exclusive bit set, and this type was constructed when
// the number of shared locks was at 0, we can safely store a `0` into
// the atomic
debug_assert_eq!(self.lock.shared.load(Ordering::Relaxed), EXCLUSIVE_BIT); // relaxed because we acquired it before
self.lock.shared.store(0, Ordering::Release);
}
}
impl<'a, T> std::ops::Deref for UnfairSeLockExclusiveGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
return unsafe{ &*self.lock.cell.get() };
}
}
impl<'a, T> std::ops::DerefMut for UnfairSeLockExclusiveGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
return unsafe{ &mut *self.lock.cell.get() };
}
}
|