Files @ 80ce091d67cf
Branch filter:

Location: CSY/reowolf/src/runtime2/store/unfair_se_lock.rs

80ce091d67cf 7.0 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
WIP: Integrating select, fixing bugs on tests
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicU32, Ordering};

/// An unfair shared/exclusive lock. One may quickly describe this to be an
/// unfair RwLock where a thread that wishes to write will get to write as fast
/// as possible (i.e. waiting for readers to finish), but others writers in line
/// may have to wait for another round of readers acquiring the lock.
///
/// However, this is NOT a read/write lock. It is a shared/exclusive lock. It is
/// used in concurrent datastructures (implemented with atomics), so particular
/// kinds of writing may still occur by the threads holding a shared lock. In
/// that case the programmer must make sure that these writes are coordinated
/// in a thread-safe manner.
///
/// It was designed with resizable (ring)buffers in mind: most often you have
/// the standard atomic pointers/indices moving around in the ringbuffer. But
/// when the buffer needs to be resized you need to be sure that no-one is
/// reading/writing the wrong/old/deallocated buffer pointer. Hence the
/// shared/exclusive terminology.
///
/// For this reason the `UnfairSeLock` was written assuming that exclusive locks
/// are only held sometime: shared locks are obtained most of the time.
// Note: preliminary benchmark batches shows this is ~2x faster than a RwLock
// when under some contention.
pub struct UnfairSeLock<T> {
    // Uses 31 bits to track number of shared locks, and the high bit is set if
    // an exclusive lock is supposed to be held. 31 bits is more than sufficient
    // because in this project shared locks will be held by individual threads.
    shared: AtomicU32,
    cell: UnsafeCell<T>,
}

// Exclusive bit is set in the atomic value when a thread wishes to hold an
// exclusive lock.
const EXCLUSIVE_BIT: u32 = 1 << 31;

impl<T> UnfairSeLock<T> {
    pub fn new(value: T) -> Self {
        return Self{
            shared: AtomicU32::new(0),
            cell: UnsafeCell::new(value),
        }
    }

    /// Get shared access to the underlying data.
    #[must_use]
    pub fn lock_shared(&self) -> UnfairSeLockSharedGuard<T> {
        let mut shared = self.shared.load(Ordering::Relaxed);
        loop {
            if shared & EXCLUSIVE_BIT != 0 {
                shared = self.wait_until_not_exclusive(shared);
            }

            // Spinlock until we've incremented. If we fail we need to check the
            // exclusive bit again.
            let new_shared = shared + 1;
            match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) {
                Ok(_) => return UnfairSeLockSharedGuard::new(self, new_shared),
                Err(actual_value) => { shared = actual_value; },
            }
        }
    }

    /// Get exclusive access to the underlying data.
    #[must_use]
    pub fn lock_exclusive(&self) -> UnfairSeLockExclusiveGuard<T> {
        let mut shared = self.shared.load(Ordering::Relaxed);
        loop {
            if shared & EXCLUSIVE_BIT != 0 {
                shared = self.wait_until_not_exclusive(shared);
            }

            // We want to set the write bit
            let new_shared = shared | EXCLUSIVE_BIT;
            match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) {
                Ok(_) => {
                    // We've acquired the write lock, but we still might have
                    // to wait until the reader count is at 0.
                    shared = new_shared;
                    if shared != EXCLUSIVE_BIT {
                        self.wait_until_not_shared(shared);
                    }

                    return UnfairSeLockExclusiveGuard::new(self);
                },
                Err(actual_value) => { shared = actual_value; }
            }
        }
    }

    fn wait_until_not_exclusive(&self, mut shared: u32) -> u32 {
        // Assume this is only called when the EXCLUSIVE_BIT is set
        debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT);
        loop {
            // So spin until no longer held
            shared = self.shared.load(Ordering::Acquire);
            if shared & EXCLUSIVE_BIT == 0 {
                return shared;
            }
        }
    }

    #[inline]
    fn wait_until_not_shared(&self, mut shared: u32) -> u32 {
        // This is only called when someone has signaled the exclusive bit, but
        // there are still threads holding the shared lock.
        loop {
            debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT);
            if shared == EXCLUSIVE_BIT {
                // shared count is 0
                return shared;
            }

            shared = self.shared.load(Ordering::Acquire);
        }
    }
}

/// A guard signifying that the owner has shared access to the underlying
/// `UnfairSeLock`.
pub struct UnfairSeLockSharedGuard<'a, T> {
    lock: &'a UnfairSeLock<T>,
    initial_value: u32,
}

impl<'a, T> UnfairSeLockSharedGuard<'a, T> {
    fn new(lock: &'a UnfairSeLock<T>, initial_value: u32) -> Self {
        return Self{ lock, initial_value };
    }

    /// Force retrieval of the underlying type `T` in the mutable sense. Note
    /// that the caller is now responsible for ensuring that concurrent mutable
    /// access takes place in a correct fashion.
    #[inline]
    pub unsafe fn get_mut(&self) -> &mut T {
        return unsafe{ &mut *self.lock.cell.get() };
    }
}

impl<'a, T> Drop for UnfairSeLockSharedGuard<'a, T> {
    fn drop(&mut self) {
        // Spinlock until we've decremented the number of shared locks.
        let mut value = self.initial_value;
        while let Err(actual_value) = self.lock.shared.compare_exchange_weak(
            value, value - 1, Ordering::AcqRel, Ordering::Acquire
        ) {
            value = actual_value;
        }
    }
}

impl<'a, T> std::ops::Deref for UnfairSeLockSharedGuard<'a, T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        return unsafe{ &*self.lock.cell.get() };
    }
}

/// A guard signifying that the owner has exclusive access to the underlying
/// `UnfairSeLock`.
pub struct UnfairSeLockExclusiveGuard<'a, T> {
    lock: &'a UnfairSeLock<T>,
}

impl<'a, T> UnfairSeLockExclusiveGuard<'a, T> {
    fn new(lock: &'a UnfairSeLock<T>) -> Self {
        return Self{ lock };
    }
}

impl<'a, T> Drop for UnfairSeLockExclusiveGuard<'a, T> {
    fn drop(&mut self) {
        // We have the exclusive bit set, and this type was constructed when
        // the number of shared locks was at 0, we can safely store a `0` into
        // the atomic
        debug_assert_eq!(self.lock.shared.load(Ordering::Relaxed), EXCLUSIVE_BIT); // relaxed because we acquired it before
        self.lock.shared.store(0, Ordering::Release);
    }
}

impl<'a, T> std::ops::Deref for UnfairSeLockExclusiveGuard<'a, T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        return unsafe{ &*self.lock.cell.get() };
    }
}

impl<'a, T> std::ops::DerefMut for UnfairSeLockExclusiveGuard<'a, T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        return unsafe{ &mut *self.lock.cell.get() };
    }
}