Changeset - fafdf8723ee3
[Not reviewed]
0 4 0
MH - 4 years ago 2022-01-05 18:12:43
contact@maxhenger.nl
WIP: Busy with MPSC channel

Will act as backing buffer for messages (for now).
4 files changed with 96 insertions and 31 deletions:
0 comments (0 inline, 0 general)
src/collections/mod.rs
Show inline comments
 
mod string_pool;
 
mod scoped_buffer;
 
mod sets;
 
mod raw_vec; // TODO: Delete?
 
mod raw_array;
 

	
 
// mod freelist;
 

	
 
pub(crate) use string_pool::{StringPool, StringRef};
 
pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection};
 
pub(crate) use sets::{DequeSet, VecSet};
 
pub(crate) use raw_vec::RawVec;
 
\ No newline at end of file
 
pub(crate) use raw_vec::RawVec;
 
pub(crate) use raw_array::RawArray;
 
\ No newline at end of file
src/collections/raw_array.rs
Show inline comments
 
use std::{mem, ptr};
 
use std::alloc::{Layout, alloc, dealloc};
 

	
 
/// Very simple resizable array. Doesn't call destructors or anything. Just
 
/// makes sure that the array is cleaned up when dropped, and allows the user
 
/// to ergonomically resize. Assumes that `size_of::<T>() != 0` (and checks this
 
/// in debug mode).
 
pub struct RawArray<T> {
 
    data: *mut T,
 
    count: usize,
 
}
 

	
 
impl<T> RawArray<T> {
 
    const SIZE: usize = mem::size_of::<T>();
 
    const ALIGNMENT: usize = mem::align_of::<T>();
 

	
 
    /// Constructs a new and empty (not allocated) array.
 
    pub fn new() -> Self {
 
        return Self{
 
            data: ptr::null_mut(),
 
            count: 0,
 
        }
 
    }
 

	
 
    /// Resizes the array. All existing elements are preserved (whether the
 
    /// contained bytes are bogus or not).
 
    pub fn resize(&mut self, new_count: usize) {
 
        if new_count > self.count {
 
            let new_data = Self::allocate(new_count);
 
            if !self.data.is_null() {
 
                unsafe {
 
                    ptr::copy_nonoverlapping(self.data, new_data, self.count);
 
                    Self::deallocate(self.data, self.count);
 
                }
 
            }
 

	
 
            self.data = new_data;
 
            self.count = new_count;
 
        } else if new_count < self.count {
 
            // `new_count < count` and `new_count >= 0`, so `data != null`.
 
            if new_count == 0 {
 
                Self::deallocate(self.data, self.count);
 
                self.data = ptr::null_mut();
 
                self.count = 0;
 
            } else {
 
                let new_data = Self::allocate(new_count);
 
                unsafe {
 
                    ptr::copy_nonoverlapping(self.data, new_data, new_count);
 
                    Self::deallocate(self.data, self.count);
 
                }
 
                self.data = new_data;
 
                self.count = new_count;
 
            }
 
        } // otherwise: equal
 
    }
 

	
 
    /// Retrieves mutable pointer to the value at the specified index. The
 
    /// returned `*mut T` may point to bogus uninitialized memory.
 
    pub fn get(&self, index: usize) -> *mut T {
 
        debug_assert!(index < self.count); // at least some safety, amirite?
 
        return unsafe{ self.data.add(index) };
 
    }
 

	
 
    /// Retrieves the base pointer of the array. Hence may be null, or may point
 
    /// to bogus uninitialized memory.
 
    pub fn data(&self) -> *mut T {
 
        return self.data;
 
    }
 

	
 
    /// Returns the length of the array.
 
    pub fn len(&self) -> usize {
 
    /// Returns the capacity of the array.
 
    pub fn cap(&self) -> usize {
 
        return self.count;
 
    }
 

	
 
    fn allocate(count: usize) -> *mut T {
 
        debug_assert_ne!(Self::SIZE, 0);
 
        let size = count * Self::SIZE;
 
        unsafe {
 
            let layout = Layout::from_size_align_unchecked(size, Self::ALIGNMENT);
 
            let data = alloc(layout);
 
            return mem::transmute(data);
 
        }
 
    }
 

	
 
    fn deallocate(data: *mut T, count: usize) {
 
        let size = count * Self::SIZE;
 
        unsafe {
 
            let layout = Layout::from_size_align_unchecked(size, Self::ALIGNMENT);
 
            dealloc(mem::transmute(data), layout);
 
        }
 
    }
 
}
 

	
 
impl<T> Drop for RawArray<T> {
 
    fn drop(&mut self) {
 
        if !self.data.is_null() {
 
            Self::deallocate(self.data, self.count);
 
        }
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 

	
 
    fn fill(array: &mut RawArray<usize>, count: usize) {
 
        for idx in 0..count {
 
            unsafe{ *array.get(idx) = idx }
 
        }
 
    }
 

	
 
    fn check(array: &RawArray<usize>, count: usize) {
 
        for idx in 0..count {
 
            assert_eq!(unsafe{ *array.get(idx) }, idx);
 
        }
 
    }
 

	
 
    #[test]
 
    fn drop_empty_array() {
 
        let array = RawArray::<u32>::new();
 
        assert_eq!(array.len(), 0);
 
        assert_eq!(array.cap(), 0);
 
        assert_eq!(array.data(), ptr::null_mut());
 
    }
 

	
 
    #[test]
 
    fn increase_size() {
 
        const INIT_SIZE: usize = 16;
 
        const NUM_RESIZES: usize = 4;
 
        let mut array = RawArray::new();
 
        array.resize(INIT_SIZE);
 
        fill(&mut array, INIT_SIZE);
 

	
 
        for grow_idx in 0..NUM_RESIZES {
 
            let new_size = INIT_SIZE + grow_idx * 4;
 
            array.resize(new_size);
 
            assert_eq!(array.len(), new_size);
 
            assert_eq!(array.cap(), new_size);
 
        }
 

	
 
        check(&array, INIT_SIZE);
 
    }
 

	
 
    #[test]
 
    fn maintain_size() {
 
        const INIT_SIZE: usize = 16;
 
        const NUM_RESIZES :usize = 4;
 

	
 
        let mut array = RawArray::new();
 
        array.resize(INIT_SIZE);
 
        fill(&mut array, INIT_SIZE);
 
        for _idx in 0..NUM_RESIZES {
 
            array.resize(INIT_SIZE);
 
            assert_eq!(array.len(), INIT_SIZE);
 
            assert_eq!(array.cap(), INIT_SIZE);
 
        }
 
        check(&array, INIT_SIZE);
 
    }
 

	
 
    #[test]
 
    fn decrease_size() {
 
        const INIT_SIZE: usize = 16;
 
        const FINAL_SIZE: usize = 8;
 

	
 
        let mut array = RawArray::new();
 
        array.resize(INIT_SIZE);
 
        fill(&mut array, INIT_SIZE);
 
        array.resize(FINAL_SIZE);
 
        check(&array, FINAL_SIZE);
 
    }
 

	
 
    #[test]
 
    fn increase_and_decrease_size() {
 
        let sizes = [12, 8, 6, 150, 128, 32, 16, 90, 4, 18, 27];
 
        let min_size = *sizes.iter().min().unwrap();
 
        let max_size = *sizes.iter().max().unwrap();
 

	
 
        let mut array = RawArray::new();
 
        array.resize(max_size);
 
        fill(&mut array, max_size);
 
        for size in sizes {
 
            array.resize(size);
 
            assert_eq!(array.len(), size);
 
            assert_eq!(array.cap(), size);
 
        }
 
        check(&array, min_size);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/component.rs
Show inline comments
 
@@ -24,384 +24,386 @@
 
 *    executed. Better to quickly be able to execute code than being able to
 
 *    quickly tear down finished components.
 
 *  - Retrieval is much more likely than creation/destruction.
 
 *
 
 * Some obvious flaws with this implementation:
 
 *  - Because of the freelist implementation we will generally allocate all of
 
 *    the data pointers that are available (i.e. if we have a buffer of size
 
 *    64, but we generally use 33 elements, than we'll have 64 elements
 
 *    allocated), which might be wasteful at larger array sizes (which are
 
 *    always powers of two).
 
 *  - A lot of concurrent operations are not necessary: we may move some of the
 
 *    access to the global concurrent datastructure by an initial access to some
 
 *    kind of thread-local datastructure.
 
 */
 

	
 
use std::mem::transmute;
 
use std::alloc::{alloc, dealloc, Layout};
 
use std::ptr;
 
use std::sync::atomic::{AtomicUsize, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
pub struct ComponentStore<T: Sized> {
 
    inner: UnfairSeLock<Inner<T>>,
 
    read_head: AtomicUsize,
 
    write_head: AtomicUsize,
 
    limit_head: AtomicUsize,
 
}
 

	
 
unsafe impl<T: Sized> Send for ComponentStore<T>{}
 
unsafe impl<T: Sized> Sync for ComponentStore<T>{}
 

	
 
struct Inner<T: Sized> {
 
    freelist: Vec<u32>,
 
    data: Vec<*mut T>,
 
    size: usize,
 
    compare_mask: usize,
 
    index_mask: usize,
 
}
 

	
 
type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
impl<T: Sized> ComponentStore<T> {
 
    pub fn new(initial_size: usize) -> Self {
 
        Self::assert_valid_size(initial_size);
 

	
 
        // Fill initial freelist and preallocate data array
 
        let mut initial_freelist = Vec::with_capacity(initial_size);
 
        for idx in 0..initial_size {
 
            initial_freelist.push(idx as u32)
 
        }
 

	
 
        let mut initial_data = Vec::new();
 
        initial_data.resize(initial_size, ptr::null_mut());
 

	
 
        // Return initial store
 
        return Self{
 
            inner: UnfairSeLock::new(Inner{
 
                freelist: initial_freelist,
 
                data: initial_data,
 
                size: initial_size,
 
                compare_mask: 2*initial_size - 1,
 
                index_mask: initial_size - 1,
 
            }),
 
            read_head: AtomicUsize::new(0),
 
            write_head: AtomicUsize::new(initial_size),
 
            limit_head: AtomicUsize::new(initial_size),
 
        };
 
    }
 

	
 
    /// Creates a new element initialized to the provided `value`. This returns
 
    /// the index at which the element can be retrieved.
 
    pub fn create(&self, value: T) -> u32 {
 
        let lock = self.inner.lock_shared();
 
        let (lock, index) = self.pop_freelist_index(lock);
 
        self.initialize_at_index(lock, index, value);
 
        return index;
 
    }
 

	
 
    /// Destroys an element at the provided `index`. The caller must make sure
 
    /// that it does not use any previously received references to the data at
 
    /// this index, and that no more calls to `get` are performed using this
 
    /// index. This is allowed again if the index has been reacquired using
 
    /// `create`.
 
    pub fn destroy(&self, index: u32) {
 
        let lock = self.inner.lock_shared();
 
        self.destruct_at_index(&lock, index);
 
        self.push_freelist_index(&lock, index);
 
    }
 

	
 
    /// Retrieves an element by reference
 
    pub fn get(&self, index: u32) -> &T {
 
        let lock = self.inner.lock_shared();
 
        let value = lock.data[index as usize];
 
        unsafe {
 
            debug_assert!(!value.is_null());
 
            return &*value;
 
        }
 
    }
 

	
 
    /// Retrieves an element by mutable reference. The caller should ensure that
 
    /// use of that mutability is thread-safe
 
    pub fn get_mut(&self, index: u32) -> &mut T {
 
        let lock = self.inner.lock_shared();
 
        let value = lock.data[index as usize];
 
        unsafe {
 
            debug_assert!(!value.is_null());
 
            return &mut *value;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn pop_freelist_index<'a>(&'a self, mut read_lock: InnerRead<'a, T>) -> (InnerRead<'a, T>, u32) {
 
        'attempt_read: loop {
 
            // Load indices and check for reallocation condition
 
            let current_size = read_lock.size;
 
            let mut read_index = self.read_head.load(Ordering::Relaxed);
 
            let limit_index = self.limit_head.load(Ordering::Acquire);
 

	
 
            if read_index == limit_index {
 
                read_lock = self.reallocate(current_size, read_lock);
 
                continue 'attempt_read;
 
            }
 

	
 
            loop {
 
                let preemptive_read = read_lock.freelist[read_index & read_lock.index_mask];
 
                if let Err(actual_read_index) = self.read_head.compare_exchange(
 
                    read_index, (read_index + 1) & read_lock.compare_mask,
 
                    Ordering::AcqRel, Ordering::Acquire
 
                ) {
 
                    // We need to try again
 
                    read_index = actual_read_index;
 
                    continue 'attempt_read;
 
                }
 

	
 
                // If here then we performed the read
 
                return (read_lock, preemptive_read);
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn initialize_at_index(&self, read_lock: InnerRead<T>, index: u32, value: T) {
 
        let mut target_ptr = read_lock.data[index as usize];
 

	
 
        unsafe {
 
            if target_ptr.is_null() {
 
                let layout = Layout::for_value(&value);
 
                target_ptr = std::alloc::alloc(layout).cast();
 
                let rewrite: *mut *mut T = transmute(read_lock.data.as_ptr());
 
                *rewrite.add(index as usize) = target_ptr;
 
            }
 

	
 
            std::ptr::write(target_ptr, value);
 
        }
 
    }
 

	
 
    #[inline]
 
    fn push_freelist_index(&self, read_lock: &InnerRead<T>, index_to_put_back: u32) {
 
        // Acquire an index in the freelist to which we can write
 
        let mut cur_write_index = self.write_head.load(Ordering::Relaxed);
 
        let mut new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
        while let Err(actual_write_index) = self.write_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Acquire
 
        ) {
 
            cur_write_index = actual_write_index;
 
            new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
        }
 

	
 
        // We own the data at the index, write to it and notify reader through
 
        // limit_head that it can be read from. Note that we cheat around the
 
        // rust mutability system here :)
 
        unsafe {
 
            let target: *mut u32 = transmute(read_lock.freelist.as_ptr());
 
            *(target.add(cur_write_index & read_lock.index_mask)) = index_to_put_back;
 
        }
 

	
 
        // Essentially spinlocking, relaxed failure ordering because the logic
 
        // is that a write first moves the `write_head`, then the `limit_head`.
 
        while let Err(_) = self.limit_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Relaxed
 
        ) {};
 
    }
 

	
 
    #[inline]
 
    fn destruct_at_index(&self, read_lock: &InnerRead<T>, index: u32) {
 
        let target_ptr = read_lock.data[index as usize];
 
        unsafe{ ptr::drop_in_place(target_ptr); }
 
    }
 

	
 
    // NOTE: Bit of a mess, and could have a cleanup with better logic for the
 
    // resizing. Maybe even a different indexing scheme...
 
    fn reallocate(&self, old_size: usize, inner: InnerRead<T>) -> InnerRead<T> {
 
        drop(inner);
 
        {
 
            // After dropping read lock, acquire write lock
 
            let mut lock = self.inner.lock_exclusive();
 

	
 
            if old_size == lock.size {
 
                // We are the thread that is supposed to reallocate
 
                let new_size = old_size * 2;
 
                Self::assert_valid_size(new_size);
 

	
 
                // Note that the atomic indices are in the range [0, new_size)
 
                // already, so we need to be careful
 
                let new_index_mask = new_size - 1;
 
                let new_compare_mask = (2 * new_size) - 1;
 
                lock.data.resize(new_size, ptr::null_mut());
 
                lock.freelist.resize(new_size, 0);
 
                for idx in 0..old_size {
 
                    lock.freelist[old_size + idx] = lock.freelist[idx];
 
                }
 

	
 
                // We need to fill the freelist with the indices of all of the
 
                // new elements that we have just created.
 
                debug_assert_eq!(self.limit_head.load(Ordering::SeqCst), self.write_head.load(Ordering::SeqCst));
 
                let old_read_index = self.read_head.load(Ordering::SeqCst);
 
                let old_write_index = self.write_head.load(Ordering::SeqCst);
 

	
 
                if old_read_index > old_write_index {
 
                    // Read index wraps, so keep it as-is and fill
 
                    let new_read_index = old_read_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (new_read_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    self.read_head.store(new_read_index, Ordering::SeqCst);
 
                    debug_assert!(new_read_index < 2*new_size);
 
                    debug_assert!(old_write_index.wrapping_sub(new_read_index) & new_compare_mask <= new_size);
 
                } else {
 
                    // No wrapping, so increment write index
 
                    let new_write_index = old_write_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (old_write_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    // Update write/limit heads
 
                    self.write_head.store(new_write_index, Ordering::SeqCst);
 
                    self.limit_head.store(new_write_index, Ordering::SeqCst);
 
                    debug_assert!(new_write_index < 2*new_size);
 
                    debug_assert!(new_write_index.wrapping_sub(old_read_index) & new_compare_mask <= new_size);
 
                }
 

	
 
                // Update sizes and masks
 
                lock.size = new_size;
 
                lock.compare_mask = new_compare_mask;
 
                lock.index_mask = new_index_mask;
 
            } // else: someone else allocated, so we don't have to
 
        }
 

	
 
        // We've dropped the write lock, acquire the read lock again
 
        return self.inner.lock_shared();
 
    }
 

	
 
    #[inline]
 
    fn assert_valid_size(size: usize) {
 
        // Condition the size needs to adhere to. Some are a bit excessive, but
 
        // we don't hit this check very often
 
        assert!(
 
            size.is_power_of_two() &&
 
                size >= 4 &&
 
                size <= usize::MAX / 2 &&
 
                size <= u32::MAX as usize
 
        );
 
    }
 
}
 

	
 
impl<T: Sized> Drop for ComponentStore<T> {
 
    fn drop(&mut self) {
 
        let value_layout = Layout::from_size_align(
 
            std::mem::size_of::<T>(), std::mem::align_of::<T>()
 
        ).unwrap();
 

	
 
        // Note that if the indices exist in the freelist then the destructor
 
        // has already been called. So handle them first
 
        let mut lock = self.inner.lock_exclusive();
 

	
 
        let read_index = self.read_head.load(Ordering::Acquire);
 
        let write_index = self.write_head.load(Ordering::Acquire);
 
        debug_assert_eq!(write_index, self.limit_head.load(Ordering::Acquire));
 

	
 
        let mut index = read_index;
 
        while index != write_index {
 
            let dealloc_index = lock.freelist[index & lock.index_mask] as usize;
 
            let target_ptr = lock.data[dealloc_index];
 

	
 
            unsafe {
 
                dealloc(target_ptr.cast(), value_layout);
 
                lock.data[dealloc_index] = ptr::null_mut();
 
            }
 

	
 
            index += 1;
 
            index &= lock.compare_mask;
 
        }
 

	
 
        // With all of those set to null, we'll just iterate through all
 
        // pointers and destruct+deallocate the ones not set to null yet
 
        for target_ptr in lock.data.iter().copied() {
 
            if !target_ptr.is_null() {
 
                unsafe {
 
                    ptr::drop_in_place(target_ptr);
 
                    dealloc(target_ptr.cast(), value_layout);
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 

	
 
    use rand::prelude::*;
 
    use rand_pcg::Pcg32;
 

	
 
    use std::sync::Arc;
 
    use std::sync::atomic::{AtomicU64, Ordering};
 

	
 
    pub struct Resource {
 
        dtor: Arc<AtomicU64>,
 
        val: u64,
 
    }
 

	
 
    impl Resource {
 
        fn new(ctor: Arc<AtomicU64>, dtor: Arc<AtomicU64>, val: u64) -> Self {
 
            ctor.fetch_add(1, Ordering::SeqCst);
 
            return Self{ dtor, val };
 
        }
 
    }
 

	
 
    impl Drop for Resource {
 
        fn drop(&mut self) {
 
            self.dtor.fetch_add(1, Ordering::SeqCst);
 
        }
 
    }
 

	
 
    fn seeds() -> Vec<[u8;16]> {
 
        return vec![
 
            [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172],
 
            [178, 112, 230, 205, 230, 178, 2, 90, 162, 218, 49, 196, 224, 222, 208, 43],
 
            [245, 42, 35, 167, 153, 205, 221, 144, 200, 253, 144, 117, 176, 231, 17, 70],
 
            [143, 39, 177, 216, 124, 96, 225, 39, 30, 82, 239, 193, 133, 58, 255, 193],
 
            [25, 105, 10, 52, 161, 212, 190, 112, 178, 193, 68, 249, 167, 153, 172, 144],
 
        ]
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_simple_unthreaded() {
 
        const NUM_ROUNDS: usize = 5;
 
        const NUM_ELEMENTS: usize = 1024;
 

	
 
        let store = ComponentStore::new(32);
 
        let ctor_counter = Arc::new(AtomicU64::new(0));
 
        let dtor_counter = Arc::new(AtomicU64::new(0));
 

	
 
        let mut indices = Vec::with_capacity(NUM_ELEMENTS);
 
        for _round_index in 0..NUM_ROUNDS {
 
            // Creation round
 
            for value in 0..NUM_ELEMENTS {
 
                let new_resource = Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64);
 
                let new_index = store.create(new_resource);
 
                indices.push(new_index);
 
            }
 

	
 
            // Checking round
 
            for el_index in indices.iter().copied() {
 
                let element = store.get(el_index);
 
                assert_eq!(element.val, el_index as u64);
 
            }
 

	
 
            // Destruction round
 
            for el_index in indices.iter().copied() {
 
                store.destroy(el_index);
 
            }
 

	
 
            indices.clear();
 
        }
 

	
 
        let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
        let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
        assert_eq!(num_ctor_calls, num_dtor_calls);
 
        assert_eq!(num_ctor_calls, (NUM_ROUNDS * NUM_ELEMENTS) as u64);
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
use std::sync::atomic::{AtomicU32, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock};
 
use crate::collections::RawArray;
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
/// Multiple-producer single-consumer queue. Generally used in the publicly
 
/// accessible fields of a component. The holder of this struct should be the
 
/// consumer. To retrieve access to the producer-side: call `producer()`.
 
///
 
/// This is a queue that will resize (indefinitely) if it becomes full, and will
 
/// not shrink. So probably a temporary thing.
 
///
 
/// In debug mode we'll make sure that there are no producers when the queue is
 
/// dropped. We don't do this in release mode because the runtime is written
 
/// such that components always remain alive (hence, this queue will remain
 
/// accessible) while there are references to it.
 
pub struct QueueDynMpsc<T: 'static> {
 
pub struct QueueDynMpsc<T> {
 
    // Entire contents are boxed up such that we can create producers that have
 
    // a pointer to the contents.
 
    inner: Box<Shared<T>>
 
}
 

	
 
// One may move around the queue between threads, as long as there is only one
 
// instance of it.
 
unsafe impl<T> Send for QueueDynMpsc<T>{}
 

	
 
/// Shared data between queue consumer and the queue producers
 
struct Shared<T: 'static> {
 
struct Shared<T> {
 
    data: UnfairSeLock<Inner<T>>,
 
    read_head: u32,
 
    read_head: AtomicU32,
 
    write_head: AtomicU32,
 
    limit_head: AtomicU32,
 
    #[cfg(debug_assertions)] dbg: AtomicU32,
 
}
 

	
 
/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
 
/// inner data array is resized.
 
struct Inner<T> {
 
    data: Vec<T>,
 
    data: RawArray<T>,
 
    compare_mask: u32,
 
    read_mask: u32,
 
}
 

	
 
type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
impl<T> QueueDynMpsc<T> {
 
    /// Constructs a new MPSC queue. Note that the initial capacity is always
 
    /// increased to the next power of 2 (if it isn't already).
 
    pub fn new(initial_capacity: usize) -> Self {
 
        let initial_capacity = initial_capacity.next_power_of_two();
 
        Self::assert_correct_size(initial_capacity);
 

	
 
        let mut data = RawArray::new();
 
        data.resize(initial_capacity);
 

	
 
        let initial_capacity = initial_capacity as u32;
 

	
 
        return Self{
 
            inner: Box::new(Shared {
 
                data: UnfairSeLock::new(Inner{
 
                    data: Vec::with_capacity(initial_capacity),
 
                    compare_mask: (2 * initial_capacity as u32) - 1,
 
                    read_mask: initial_capacity as u32 - 1,
 
                    data,
 
                    compare_mask: (2 * initial_capacity) - 1,
 
                    read_mask: initial_capacity - 1,
 
                }),
 
                read_head: 0,
 
                write_head: AtomicU32::new(0),
 
                limit_head: AtomicU32::new(0),
 
                read_head: AtomicU32::new(0),
 
                write_head: AtomicU32::new(initial_capacity),
 
                limit_head: AtomicU32::new(initial_capacity),
 
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
 
            }),
 
        };
 
    }
 

	
 
    #[inline]
 
    pub fn producer(&self) -> QueueDynProducer<T> {
 
        return QueueDynProducer::new(self);
 
    }
 

	
 
    /// Perform an attempted read from the queue. It might be that some producer
 
    /// is putting something in the queue while this function is executing, and
 
    /// we don't get the consume it.
 
    pub fn read(&mut self) -> Option<T> {
 
        let cur_read = self.inner.read_head;
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
    pub fn pop(&mut self) -> Option<T> {
 
        let data_lock = self.inner.data.lock_shared();
 
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let buf_size = data_lock.data.cap();
 

	
 
        if cur_read != cur_limit {
 
        if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
 
            // Make a bitwise copy of the value and return it. The receiver is
 
            // responsible for dropping it.
 
            unsafe {
 
                let source = data_lock.data.as_ptr().add((cur_read & data_lock.read_mask) as usize);
 
                self.inner.read_head += 1;
 
                let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize);
 
                // We can perform a store since we're the only ones modifying
 
                // the atomic.
 
                self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release);
 
                return Some(std::ptr::read(source));
 
            }
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn assert_correct_size(capacity: usize) {
 
        assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynMpsc<T> {
 
    fn drop(&mut self) {
 
        // There should be no more `QueueDynProducer` pointers to this queue
 
        dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
 
        // And so the limit head should be equal to the write head
 
        let data_lock = self.inner.data.lock_shared();
 
        let write_index = self.inner.write_head.load(Ordering::Acquire);
 
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);
 

	
 
        // Every item that has not yet been taken out of the queue needs to
 
        // have
 
        while self.inner.read_head != write_index {
 

	
 
        // have its destructor called. We immediately apply the
 
        // increment-by-size trick and wait until we've hit the write head.
 
        let mut read_index = self.inner.read_head.load(Ordering::Acquire);
 
        read_index += data_lock.data.cap();
 
        while read_index & data_lock.compare_mask != write_index {
 
            unsafe {
 
                let target = data_lock.data.get((read_index & data_lock.read_mask) as usize);
 
                std::ptr::drop_in_place(target);
 
            }
 
            read_index += 1;
 
        }
 
    }
 
}
 

	
 
pub struct QueueDynProducer<T: 'static> {
 
    queue: &'static Shared<T>,
 
pub struct QueueDynProducer<T> {
 
    queue: *const Shared<T>,
 
}
 

	
 
impl<T> QueueDynProducer<T> {
 
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
 
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
 
        unsafe {
 
            // If you only knew the power of the dark side! Obi-Wan never told
 
            // you what happened to your father!
 
            let queue: &'static _ = std::mem::transmute(consumer.inner.as_ref());
 
            let queue: *const _ = std::mem::transmute(consumer.inner.as_ref());
 
            return Self{ queue };
 
        }
 
    }
 

	
 
    pub fn push(&self, value: T) {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        let data_lock = queue.data.lock_shared();
 
        let read_index = queue.read_head.load(Ordering::Acquire);
 
        let write_index = queue.write_head.load(Ordering::Acquire);
 
        if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
 
            let expected_capacity = data_lock.data.cap();
 
        }
 
    }
 

	
 
    fn resize(&self, shared_lock: InnerRead<T>, expected_capacity: usize) -> InnerRead<T> {
 
        drop(shared_lock);
 
        let queue = unsafe{ &*self.queue };
 

	
 
        {
 
            let exclusive_lock = self.queue.data.lock_exclusive();
 

	
 
            // We hold the exclusive lock, but someone else might have done the resizing, and so:
 
            if exclusive_lock.data.cap() == expected_capacity {
 
                let old_capacity = expected_capacity;
 
                let new_capacity = 2 * old_capacity;
 

	
 
                // Resize by a factor of two, and make the two halves identical.
 
                exclusive_lock.data.resize(new_capacity);
 
                for idx in old_capacity..new_capacity {
 
                    unsafe {
 
                        let target = exclusive_lock.data.get(idx);
 
                        let source = exclusive_lock.data.get(idx - old_capacity);
 
                        std::ptr::write(target, std::ptr::read(source));
 
                    }
 
                }
 

	
 
                // Modify all atomics to reflect that we just resized the
 
                // underlying buffer.
 
                
 
            }
 
        }
 

	
 
        // Reacquire shared lock
 
        return queue.data.lock_shared();
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynProducer<T> {
 
    fn drop(&mut self) {
 
        dbg_code!(self.queue.dbg.fetch_sub(1, Ordering::AcqRel));
 
        dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) });
 
    }
 
}
 

	
 
// producer end is `Send`, because in debug mode we make sure that there are no
 
// more producers when the queue is destroyed. But is not sync, because that
 
// would circumvent our atomic counter shenanigans.
 
unsafe impl<T> Send for QueueDynProducer<T>{}
 
\ No newline at end of file
0 comments (0 inline, 0 general)