Changeset - fafdf8723ee3
[Not reviewed]
0 4 0
MH - 3 years ago 2022-01-05 18:12:43
contact@maxhenger.nl
WIP: Busy with MPSC channel

Will act as backing buffer for messages (for now).
4 files changed with 96 insertions and 31 deletions:
0 comments (0 inline, 0 general)
src/collections/mod.rs
Show inline comments
 
mod string_pool;
 
mod scoped_buffer;
 
mod sets;
 
mod raw_vec; // TODO: Delete?
 
mod raw_array;
 

	
 
// mod freelist;
 

	
 
pub(crate) use string_pool::{StringPool, StringRef};
 
pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection};
 
pub(crate) use sets::{DequeSet, VecSet};
 
pub(crate) use raw_vec::RawVec;
 
\ No newline at end of file
 
pub(crate) use raw_vec::RawVec;
 
pub(crate) use raw_array::RawArray;
 
\ No newline at end of file
src/collections/raw_array.rs
Show inline comments
 
@@ -58,26 +58,26 @@ impl<T> RawArray<T> {
 
    /// returned `*mut T` may point to bogus uninitialized memory.
 
    pub fn get(&self, index: usize) -> *mut T {
 
        debug_assert!(index < self.count); // at least some safety, amirite?
 
        return unsafe{ self.data.add(index) };
 
    }
 

	
 
    /// Retrieves the base pointer of the array. Hence may be null, or may point
 
    /// to bogus uninitialized memory.
 
    pub fn data(&self) -> *mut T {
 
        return self.data;
 
    }
 

	
 
    /// Returns the length of the array.
 
    pub fn len(&self) -> usize {
 
    /// Returns the capacity of the array.
 
    pub fn cap(&self) -> usize {
 
        return self.count;
 
    }
 

	
 
    fn allocate(count: usize) -> *mut T {
 
        debug_assert_ne!(Self::SIZE, 0);
 
        let size = count * Self::SIZE;
 
        unsafe {
 
            let layout = Layout::from_size_align_unchecked(size, Self::ALIGNMENT);
 
            let data = alloc(layout);
 
            return mem::transmute(data);
 
        }
 
    }
 
@@ -109,56 +109,56 @@ mod tests {
 
        }
 
    }
 

	
 
    fn check(array: &RawArray<usize>, count: usize) {
 
        for idx in 0..count {
 
            assert_eq!(unsafe{ *array.get(idx) }, idx);
 
        }
 
    }
 

	
 
    #[test]
 
    fn drop_empty_array() {
 
        let array = RawArray::<u32>::new();
 
        assert_eq!(array.len(), 0);
 
        assert_eq!(array.cap(), 0);
 
        assert_eq!(array.data(), ptr::null_mut());
 
    }
 

	
 
    #[test]
 
    fn increase_size() {
 
        const INIT_SIZE: usize = 16;
 
        const NUM_RESIZES: usize = 4;
 
        let mut array = RawArray::new();
 
        array.resize(INIT_SIZE);
 
        fill(&mut array, INIT_SIZE);
 

	
 
        for grow_idx in 0..NUM_RESIZES {
 
            let new_size = INIT_SIZE + grow_idx * 4;
 
            array.resize(new_size);
 
            assert_eq!(array.len(), new_size);
 
            assert_eq!(array.cap(), new_size);
 
        }
 

	
 
        check(&array, INIT_SIZE);
 
    }
 

	
 
    #[test]
 
    fn maintain_size() {
 
        const INIT_SIZE: usize = 16;
 
        const NUM_RESIZES :usize = 4;
 

	
 
        let mut array = RawArray::new();
 
        array.resize(INIT_SIZE);
 
        fill(&mut array, INIT_SIZE);
 
        for _idx in 0..NUM_RESIZES {
 
            array.resize(INIT_SIZE);
 
            assert_eq!(array.len(), INIT_SIZE);
 
            assert_eq!(array.cap(), INIT_SIZE);
 
        }
 
        check(&array, INIT_SIZE);
 
    }
 

	
 
    #[test]
 
    fn decrease_size() {
 
        const INIT_SIZE: usize = 16;
 
        const FINAL_SIZE: usize = 8;
 

	
 
        let mut array = RawArray::new();
 
        array.resize(INIT_SIZE);
 
        fill(&mut array, INIT_SIZE);
 
@@ -168,17 +168,17 @@ mod tests {
 

	
 
    #[test]
 
    fn increase_and_decrease_size() {
 
        let sizes = [12, 8, 6, 150, 128, 32, 16, 90, 4, 18, 27];
 
        let min_size = *sizes.iter().min().unwrap();
 
        let max_size = *sizes.iter().max().unwrap();
 

	
 
        let mut array = RawArray::new();
 
        array.resize(max_size);
 
        fill(&mut array, max_size);
 
        for size in sizes {
 
            array.resize(size);
 
            assert_eq!(array.len(), size);
 
            assert_eq!(array.cap(), size);
 
        }
 
        check(&array, min_size);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/component.rs
Show inline comments
 
@@ -204,24 +204,26 @@ impl<T: Sized> ComponentStore<T> {
 
        while let Err(_) = self.limit_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Relaxed
 
        ) {};
 
    }
 

	
 
    #[inline]
 
    fn destruct_at_index(&self, read_lock: &InnerRead<T>, index: u32) {
 
        let target_ptr = read_lock.data[index as usize];
 
        unsafe{ ptr::drop_in_place(target_ptr); }
 
    }
 

	
 
    // NOTE: Bit of a mess, and could have a cleanup with better logic for the
 
    // resizing. Maybe even a different indexing scheme...
 
    fn reallocate(&self, old_size: usize, inner: InnerRead<T>) -> InnerRead<T> {
 
        drop(inner);
 
        {
 
            // After dropping read lock, acquire write lock
 
            let mut lock = self.inner.lock_exclusive();
 

	
 
            if old_size == lock.size {
 
                // We are the thread that is supposed to reallocate
 
                let new_size = old_size * 2;
 
                Self::assert_valid_size(new_size);
 

	
 
                // Note that the atomic indices are in the range [0, new_size)
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
use std::sync::atomic::{AtomicU32, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock};
 
use crate::collections::RawArray;
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
/// Multiple-producer single-consumer queue. Generally used in the publicly
 
/// accessible fields of a component. The holder of this struct should be the
 
/// consumer. To retrieve access to the producer-side: call `producer()`.
 
///
 
/// This is a queue that will resize (indefinitely) if it becomes full, and will
 
/// not shrink. So probably a temporary thing.
 
///
 
/// In debug mode we'll make sure that there are no producers when the queue is
 
/// dropped. We don't do this in release mode because the runtime is written
 
/// such that components always remain alive (hence, this queue will remain
 
/// accessible) while there are references to it.
 
pub struct QueueDynMpsc<T: 'static> {
 
pub struct QueueDynMpsc<T> {
 
    // Entire contents are boxed up such that we can create producers that have
 
    // a pointer to the contents.
 
    inner: Box<Shared<T>>
 
}
 

	
 
// One may move around the queue between threads, as long as there is only one
 
// instance of it.
 
unsafe impl<T> Send for QueueDynMpsc<T>{}
 

	
 
/// Shared data between queue consumer and the queue producers
 
struct Shared<T: 'static> {
 
struct Shared<T> {
 
    data: UnfairSeLock<Inner<T>>,
 
    read_head: u32,
 
    read_head: AtomicU32,
 
    write_head: AtomicU32,
 
    limit_head: AtomicU32,
 
    #[cfg(debug_assertions)] dbg: AtomicU32,
 
}
 

	
 
/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
 
/// inner data array is resized.
 
struct Inner<T> {
 
    data: Vec<T>,
 
    data: RawArray<T>,
 
    compare_mask: u32,
 
    read_mask: u32,
 
}
 

	
 
type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
impl<T> QueueDynMpsc<T> {
 
    /// Constructs a new MPSC queue. Note that the initial capacity is always
 
    /// increased to the next power of 2 (if it isn't already).
 
    pub fn new(initial_capacity: usize) -> Self {
 
        let initial_capacity = initial_capacity.next_power_of_two();
 
        Self::assert_correct_size(initial_capacity);
 

	
 
        let mut data = RawArray::new();
 
        data.resize(initial_capacity);
 

	
 
        let initial_capacity = initial_capacity as u32;
 

	
 
        return Self{
 
            inner: Box::new(Shared {
 
                data: UnfairSeLock::new(Inner{
 
                    data: Vec::with_capacity(initial_capacity),
 
                    compare_mask: (2 * initial_capacity as u32) - 1,
 
                    read_mask: initial_capacity as u32 - 1,
 
                    data,
 
                    compare_mask: (2 * initial_capacity) - 1,
 
                    read_mask: initial_capacity - 1,
 
                }),
 
                read_head: 0,
 
                write_head: AtomicU32::new(0),
 
                limit_head: AtomicU32::new(0),
 
                read_head: AtomicU32::new(0),
 
                write_head: AtomicU32::new(initial_capacity),
 
                limit_head: AtomicU32::new(initial_capacity),
 
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
 
            }),
 
        };
 
    }
 

	
 
    #[inline]
 
    pub fn producer(&self) -> QueueDynProducer<T> {
 
        return QueueDynProducer::new(self);
 
    }
 

	
 
    /// Perform an attempted read from the queue. It might be that some producer
 
    /// is putting something in the queue while this function is executing, and
 
    /// we don't get the consume it.
 
    pub fn read(&mut self) -> Option<T> {
 
        let cur_read = self.inner.read_head;
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
    pub fn pop(&mut self) -> Option<T> {
 
        let data_lock = self.inner.data.lock_shared();
 
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let buf_size = data_lock.data.cap();
 

	
 
        if cur_read != cur_limit {
 
        if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
 
            // Make a bitwise copy of the value and return it. The receiver is
 
            // responsible for dropping it.
 
            unsafe {
 
                let source = data_lock.data.as_ptr().add((cur_read & data_lock.read_mask) as usize);
 
                self.inner.read_head += 1;
 
                let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize);
 
                // We can perform a store since we're the only ones modifying
 
                // the atomic.
 
                self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release);
 
                return Some(std::ptr::read(source));
 
            }
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn assert_correct_size(capacity: usize) {
 
        assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynMpsc<T> {
 
    fn drop(&mut self) {
 
        // There should be no more `QueueDynProducer` pointers to this queue
 
        dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
 
        // And so the limit head should be equal to the write head
 
        let data_lock = self.inner.data.lock_shared();
 
        let write_index = self.inner.write_head.load(Ordering::Acquire);
 
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);
 

	
 
        // Every item that has not yet been taken out of the queue needs to
 
        // have
 
        while self.inner.read_head != write_index {
 

	
 
        // have its destructor called. We immediately apply the
 
        // increment-by-size trick and wait until we've hit the write head.
 
        let mut read_index = self.inner.read_head.load(Ordering::Acquire);
 
        read_index += data_lock.data.cap();
 
        while read_index & data_lock.compare_mask != write_index {
 
            unsafe {
 
                let target = data_lock.data.get((read_index & data_lock.read_mask) as usize);
 
                std::ptr::drop_in_place(target);
 
            }
 
            read_index += 1;
 
        }
 
    }
 
}
 

	
 
pub struct QueueDynProducer<T: 'static> {
 
    queue: &'static Shared<T>,
 
pub struct QueueDynProducer<T> {
 
    queue: *const Shared<T>,
 
}
 

	
 
impl<T> QueueDynProducer<T> {
 
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
 
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
 
        unsafe {
 
            // If you only knew the power of the dark side! Obi-Wan never told
 
            // you what happened to your father!
 
            let queue: &'static _ = std::mem::transmute(consumer.inner.as_ref());
 
            let queue: *const _ = std::mem::transmute(consumer.inner.as_ref());
 
            return Self{ queue };
 
        }
 
    }
 

	
 
    pub fn push(&self, value: T) {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        let data_lock = queue.data.lock_shared();
 
        let read_index = queue.read_head.load(Ordering::Acquire);
 
        let write_index = queue.write_head.load(Ordering::Acquire);
 
        if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
 
            let expected_capacity = data_lock.data.cap();
 
        }
 
    }
 

	
 
    fn resize(&self, shared_lock: InnerRead<T>, expected_capacity: usize) -> InnerRead<T> {
 
        drop(shared_lock);
 
        let queue = unsafe{ &*self.queue };
 

	
 
        {
 
            let exclusive_lock = self.queue.data.lock_exclusive();
 

	
 
            // We hold the exclusive lock, but someone else might have done the resizing, and so:
 
            if exclusive_lock.data.cap() == expected_capacity {
 
                let old_capacity = expected_capacity;
 
                let new_capacity = 2 * old_capacity;
 

	
 
                // Resize by a factor of two, and make the two halves identical.
 
                exclusive_lock.data.resize(new_capacity);
 
                for idx in old_capacity..new_capacity {
 
                    unsafe {
 
                        let target = exclusive_lock.data.get(idx);
 
                        let source = exclusive_lock.data.get(idx - old_capacity);
 
                        std::ptr::write(target, std::ptr::read(source));
 
                    }
 
                }
 

	
 
                // Modify all atomics to reflect that we just resized the
 
                // underlying buffer.
 
                
 
            }
 
        }
 

	
 
        // Reacquire shared lock
 
        return queue.data.lock_shared();
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynProducer<T> {
 
    fn drop(&mut self) {
 
        dbg_code!(self.queue.dbg.fetch_sub(1, Ordering::AcqRel));
 
        dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) });
 
    }
 
}
 

	
 
// producer end is `Send`, because in debug mode we make sure that there are no
 
// more producers when the queue is destroyed. But is not sync, because that
 
// would circumvent our atomic counter shenanigans.
 
unsafe impl<T> Send for QueueDynProducer<T>{}
 
\ No newline at end of file
0 comments (0 inline, 0 general)