Changeset - fafdf8723ee3
[Not reviewed]
0 4 0
MH - 3 years ago 2022-01-05 18:12:43
contact@maxhenger.nl
WIP: Busy with MPSC channel

Will act as backing buffer for messages (for now).
4 files changed with 96 insertions and 31 deletions:
0 comments (0 inline, 0 general)
src/collections/mod.rs
Show inline comments
 
@@ -9,4 +9,5 @@ mod raw_array;
 
pub(crate) use string_pool::{StringPool, StringRef};
 
pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection};
 
pub(crate) use sets::{DequeSet, VecSet};
 
pub(crate) use raw_vec::RawVec;
 
\ No newline at end of file
 
pub(crate) use raw_vec::RawVec;
 
pub(crate) use raw_array::RawArray;
 
\ No newline at end of file
src/collections/raw_array.rs
Show inline comments
 
@@ -67,8 +67,8 @@ impl<T> RawArray<T> {
 
        return self.data;
 
    }
 

	
 
    /// Returns the length of the array.
 
    pub fn len(&self) -> usize {
 
    /// Returns the capacity of the array.
 
    pub fn cap(&self) -> usize {
 
        return self.count;
 
    }
 

	
 
@@ -118,7 +118,7 @@ mod tests {
 
    #[test]
 
    fn drop_empty_array() {
 
        let array = RawArray::<u32>::new();
 
        assert_eq!(array.len(), 0);
 
        assert_eq!(array.cap(), 0);
 
        assert_eq!(array.data(), ptr::null_mut());
 
    }
 

	
 
@@ -133,7 +133,7 @@ mod tests {
 
        for grow_idx in 0..NUM_RESIZES {
 
            let new_size = INIT_SIZE + grow_idx * 4;
 
            array.resize(new_size);
 
            assert_eq!(array.len(), new_size);
 
            assert_eq!(array.cap(), new_size);
 
        }
 

	
 
        check(&array, INIT_SIZE);
 
@@ -149,7 +149,7 @@ mod tests {
 
        fill(&mut array, INIT_SIZE);
 
        for _idx in 0..NUM_RESIZES {
 
            array.resize(INIT_SIZE);
 
            assert_eq!(array.len(), INIT_SIZE);
 
            assert_eq!(array.cap(), INIT_SIZE);
 
        }
 
        check(&array, INIT_SIZE);
 
    }
 
@@ -177,7 +177,7 @@ mod tests {
 
        fill(&mut array, max_size);
 
        for size in sizes {
 
            array.resize(size);
 
            assert_eq!(array.len(), size);
 
            assert_eq!(array.cap(), size);
 
        }
 
        check(&array, min_size);
 
    }
src/runtime2/store/component.rs
Show inline comments
 
@@ -213,6 +213,8 @@ impl<T: Sized> ComponentStore<T> {
 
        unsafe{ ptr::drop_in_place(target_ptr); }
 
    }
 

	
 
    // NOTE: Bit of a mess, and could have a cleanup with better logic for the
 
    // resizing. Maybe even a different indexing scheme...
 
    fn reallocate(&self, old_size: usize, inner: InnerRead<T>) -> InnerRead<T> {
 
        drop(inner);
 
        {
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
use std::sync::atomic::{AtomicU32, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock};
 
use crate::collections::RawArray;
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
/// Multiple-producer single-consumer queue. Generally used in the publicly
 
/// accessible fields of a component. The holder of this struct should be the
 
@@ -13,7 +14,7 @@ use super::unfair_se_lock::{UnfairSeLock};
 
/// dropped. We don't do this in release mode because the runtime is written
 
/// such that components always remain alive (hence, this queue will remain
 
/// accessible) while there are references to it.
 
pub struct QueueDynMpsc<T: 'static> {
 
pub struct QueueDynMpsc<T> {
 
    // Entire contents are boxed up such that we can create producers that have
 
    // a pointer to the contents.
 
    inner: Box<Shared<T>>
 
@@ -24,9 +25,9 @@ pub struct QueueDynMpsc<T: 'static> {
 
unsafe impl<T> Send for QueueDynMpsc<T>{}
 

	
 
/// Shared data between queue consumer and the queue producers
 
struct Shared<T: 'static> {
 
struct Shared<T> {
 
    data: UnfairSeLock<Inner<T>>,
 
    read_head: u32,
 
    read_head: AtomicU32,
 
    write_head: AtomicU32,
 
    limit_head: AtomicU32,
 
    #[cfg(debug_assertions)] dbg: AtomicU32,
 
@@ -35,11 +36,13 @@ struct Shared<T: 'static> {
 
/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
 
/// inner data array is resized.
 
struct Inner<T> {
 
    data: Vec<T>,
 
    data: RawArray<T>,
 
    compare_mask: u32,
 
    read_mask: u32,
 
}
 

	
 
type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
impl<T> QueueDynMpsc<T> {
 
    /// Constructs a new MPSC queue. Note that the initial capacity is always
 
    /// increased to the next power of 2 (if it isn't already).
 
@@ -47,16 +50,21 @@ impl<T> QueueDynMpsc<T> {
 
        let initial_capacity = initial_capacity.next_power_of_two();
 
        Self::assert_correct_size(initial_capacity);
 

	
 
        let mut data = RawArray::new();
 
        data.resize(initial_capacity);
 

	
 
        let initial_capacity = initial_capacity as u32;
 

	
 
        return Self{
 
            inner: Box::new(Shared {
 
                data: UnfairSeLock::new(Inner{
 
                    data: Vec::with_capacity(initial_capacity),
 
                    compare_mask: (2 * initial_capacity as u32) - 1,
 
                    read_mask: initial_capacity as u32 - 1,
 
                    data,
 
                    compare_mask: (2 * initial_capacity) - 1,
 
                    read_mask: initial_capacity - 1,
 
                }),
 
                read_head: 0,
 
                write_head: AtomicU32::new(0),
 
                limit_head: AtomicU32::new(0),
 
                read_head: AtomicU32::new(0),
 
                write_head: AtomicU32::new(initial_capacity),
 
                limit_head: AtomicU32::new(initial_capacity),
 
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
 
            }),
 
        };
 
@@ -70,17 +78,20 @@ impl<T> QueueDynMpsc<T> {
 
    /// Perform an attempted read from the queue. It might be that some producer
 
    /// is putting something in the queue while this function is executing, and
 
    /// we don't get the consume it.
 
    pub fn read(&mut self) -> Option<T> {
 
        let cur_read = self.inner.read_head;
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
    pub fn pop(&mut self) -> Option<T> {
 
        let data_lock = self.inner.data.lock_shared();
 
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let buf_size = data_lock.data.cap();
 

	
 
        if cur_read != cur_limit {
 
        if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
 
            // Make a bitwise copy of the value and return it. The receiver is
 
            // responsible for dropping it.
 
            unsafe {
 
                let source = data_lock.data.as_ptr().add((cur_read & data_lock.read_mask) as usize);
 
                self.inner.read_head += 1;
 
                let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize);
 
                // We can perform a store since we're the only ones modifying
 
                // the atomic.
 
                self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release);
 
                return Some(std::ptr::read(source));
 
            }
 
        } else {
 
@@ -99,19 +110,27 @@ impl<T> Drop for QueueDynMpsc<T> {
 
        // There should be no more `QueueDynProducer` pointers to this queue
 
        dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
 
        // And so the limit head should be equal to the write head
 
        let data_lock = self.inner.data.lock_shared();
 
        let write_index = self.inner.write_head.load(Ordering::Acquire);
 
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);
 

	
 
        // Every item that has not yet been taken out of the queue needs to
 
        // have
 
        while self.inner.read_head != write_index {
 

	
 
        // have its destructor called. We immediately apply the
 
        // increment-by-size trick and wait until we've hit the write head.
 
        let mut read_index = self.inner.read_head.load(Ordering::Acquire);
 
        read_index += data_lock.data.cap();
 
        while read_index & data_lock.compare_mask != write_index {
 
            unsafe {
 
                let target = data_lock.data.get((read_index & data_lock.read_mask) as usize);
 
                std::ptr::drop_in_place(target);
 
            }
 
            read_index += 1;
 
        }
 
    }
 
}
 

	
 
pub struct QueueDynProducer<T: 'static> {
 
    queue: &'static Shared<T>,
 
pub struct QueueDynProducer<T> {
 
    queue: *const Shared<T>,
 
}
 

	
 
impl<T> QueueDynProducer<T> {
 
@@ -120,15 +139,58 @@ impl<T> QueueDynProducer<T> {
 
        unsafe {
 
            // If you only knew the power of the dark side! Obi-Wan never told
 
            // you what happened to your father!
 
            let queue: &'static _ = std::mem::transmute(consumer.inner.as_ref());
 
            let queue: *const _ = std::mem::transmute(consumer.inner.as_ref());
 
            return Self{ queue };
 
        }
 
    }
 

	
 
    pub fn push(&self, value: T) {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        let data_lock = queue.data.lock_shared();
 
        let read_index = queue.read_head.load(Ordering::Acquire);
 
        let write_index = queue.write_head.load(Ordering::Acquire);
 
        if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
 
            let expected_capacity = data_lock.data.cap();
 
        }
 
    }
 

	
 
    fn resize(&self, shared_lock: InnerRead<T>, expected_capacity: usize) -> InnerRead<T> {
 
        drop(shared_lock);
 
        let queue = unsafe{ &*self.queue };
 

	
 
        {
 
            let exclusive_lock = self.queue.data.lock_exclusive();
 

	
 
            // We hold the exclusive lock, but someone else might have done the resizing, and so:
 
            if exclusive_lock.data.cap() == expected_capacity {
 
                let old_capacity = expected_capacity;
 
                let new_capacity = 2 * old_capacity;
 

	
 
                // Resize by a factor of two, and make the two halves identical.
 
                exclusive_lock.data.resize(new_capacity);
 
                for idx in old_capacity..new_capacity {
 
                    unsafe {
 
                        let target = exclusive_lock.data.get(idx);
 
                        let source = exclusive_lock.data.get(idx - old_capacity);
 
                        std::ptr::write(target, std::ptr::read(source));
 
                    }
 
                }
 

	
 
                // Modify all atomics to reflect that we just resized the
 
                // underlying buffer.
 
                
 
            }
 
        }
 

	
 
        // Reacquire shared lock
 
        return queue.data.lock_shared();
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynProducer<T> {
 
    fn drop(&mut self) {
 
        dbg_code!(self.queue.dbg.fetch_sub(1, Ordering::AcqRel));
 
        dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) });
 
    }
 
}
 

	
0 comments (0 inline, 0 general)