Changeset - 75e767adaaf7
[Not reviewed]
0 3 2
MH - 3 years ago 2022-01-05 17:20:42
contact@maxhenger.nl
Add a new RawArray implementation

Needed to implement ringbuffer queues in a neat way. Will deprecate the
old RawVec implementation
5 files changed with 336 insertions and 1 deletions:
0 comments (0 inline, 0 general)
src/collections/mod.rs
Show inline comments
 
mod string_pool;
 
mod scoped_buffer;
 
mod sets;
 
mod raw_vec;
 
mod raw_vec; // TODO: Delete?
 
mod raw_array;
 

	
 
// mod freelist;
 

	
src/collections/raw_array.rs
Show inline comments
 
new file 100644
 
use std::{mem, ptr};
 
use std::alloc::{Layout, alloc, dealloc};
 

	
 
/// Very simple resizable array. Doesn't call destructors or anything. Just
 
/// makes sure that the array is cleaned up when dropped, and allows the user
 
/// to ergonomically resize. Assumes that `size_of::<T>() != 0` (and checks this
 
/// in debug mode).
 
pub struct RawArray<T> {
 
    data: *mut T,
 
    count: usize,
 
}
 

	
 
impl<T> RawArray<T> {
 
    const SIZE: usize = mem::size_of::<T>();
 
    const ALIGNMENT: usize = mem::align_of::<T>();
 

	
 
    /// Constructs a new and empty (not allocated) array.
 
    pub fn new() -> Self {
 
        return Self{
 
            data: ptr::null_mut(),
 
            count: 0,
 
        }
 
    }
 

	
 
    /// Resizes the array. All existing elements are preserved (whether the
 
    /// contained bytes are bogus or not).
 
    pub fn resize(&mut self, new_count: usize) {
 
        if new_count > self.count {
 
            let new_data = Self::allocate(new_count);
 
            if !self.data.is_null() {
 
                unsafe {
 
                    ptr::copy_nonoverlapping(self.data, new_data, self.count);
 
                    Self::deallocate(self.data, self.count);
 
                }
 
            }
 

	
 
            self.data = new_data;
 
            self.count = new_count;
 
        } else if new_count < self.count {
 
            // `new_count < count` and `new_count >= 0`, so `data != null`.
 
            if new_count == 0 {
 
                Self::deallocate(self.data, self.count);
 
                self.data = ptr::null_mut();
 
                self.count = 0;
 
            } else {
 
                let new_data = Self::allocate(new_count);
 
                unsafe {
 
                    ptr::copy_nonoverlapping(self.data, new_data, new_count);
 
                    Self::deallocate(self.data, self.count);
 
                }
 
                self.data = new_data;
 
                self.count = new_count;
 
            }
 
        } // otherwise: equal
 
    }
 

	
 
    /// Retrieves mutable pointer to the value at the specified index. The
 
    /// returned `*mut T` may point to bogus uninitialized memory.
 
    pub fn get(&self, index: usize) -> *mut T {
 
        debug_assert!(index < self.count); // at least some safety, amirite?
 
        return unsafe{ self.data.add(index) };
 
    }
 

	
 
    /// Retrieves the base pointer of the array. Hence may be null, or may point
 
    /// to bogus uninitialized memory.
 
    pub fn data(&self) -> *mut T {
 
        return self.data;
 
    }
 

	
 
    /// Returns the length of the array.
 
    pub fn len(&self) -> usize {
 
        return self.count;
 
    }
 

	
 
    fn allocate(count: usize) -> *mut T {
 
        debug_assert_ne!(Self::SIZE, 0);
 
        let size = count * Self::SIZE;
 
        unsafe {
 
            let layout = Layout::from_size_align_unchecked(size, Self::ALIGNMENT);
 
            let data = alloc(layout);
 
            return mem::transmute(data);
 
        }
 
    }
 

	
 
    fn deallocate(data: *mut T, count: usize) {
 
        let size = count * Self::SIZE;
 
        unsafe {
 
            let layout = Layout::from_size_align_unchecked(size, Self::ALIGNMENT);
 
            dealloc(mem::transmute(data), layout);
 
        }
 
    }
 
}
 

	
 
impl<T> Drop for RawArray<T> {
 
    fn drop(&mut self) {
 
        if !self.data.is_null() {
 
            Self::deallocate(self.data, self.count);
 
        }
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 

	
 
    fn fill(array: &mut RawArray<usize>, count: usize) {
 
        for idx in 0..count {
 
            unsafe{ *array.get(idx) = idx }
 
        }
 
    }
 

	
 
    fn check(array: &RawArray<usize>, count: usize) {
 
        for idx in 0..count {
 
            assert_eq!(unsafe{ *array.get(idx) }, idx);
 
        }
 
    }
 

	
 
    #[test]
 
    fn drop_empty_array() {
 
        let array = RawArray::<u32>::new();
 
        assert_eq!(array.len(), 0);
 
        assert_eq!(array.data(), ptr::null_mut());
 
    }
 

	
 
    #[test]
 
    fn increase_size() {
 
        const INIT_SIZE: usize = 16;
 
        const NUM_RESIZES: usize = 4;
 
        let mut array = RawArray::new();
 
        array.resize(INIT_SIZE);
 
        fill(&mut array, INIT_SIZE);
 

	
 
        for grow_idx in 0..NUM_RESIZES {
 
            let new_size = INIT_SIZE + grow_idx * 4;
 
            array.resize(new_size);
 
            assert_eq!(array.len(), new_size);
 
        }
 

	
 
        check(&array, INIT_SIZE);
 
    }
 

	
 
    #[test]
 
    fn maintain_size() {
 
        const INIT_SIZE: usize = 16;
 
        const NUM_RESIZES :usize = 4;
 

	
 
        let mut array = RawArray::new();
 
        array.resize(INIT_SIZE);
 
        fill(&mut array, INIT_SIZE);
 
        for _idx in 0..NUM_RESIZES {
 
            array.resize(INIT_SIZE);
 
            assert_eq!(array.len(), INIT_SIZE);
 
        }
 
        check(&array, INIT_SIZE);
 
    }
 

	
 
    #[test]
 
    fn decrease_size() {
 
        const INIT_SIZE: usize = 16;
 
        const FINAL_SIZE: usize = 8;
 

	
 
        let mut array = RawArray::new();
 
        array.resize(INIT_SIZE);
 
        fill(&mut array, INIT_SIZE);
 
        array.resize(FINAL_SIZE);
 
        check(&array, FINAL_SIZE);
 
    }
 

	
 
    #[test]
 
    fn increase_and_decrease_size() {
 
        let sizes = [12, 8, 6, 150, 128, 32, 16, 90, 4, 18, 27];
 
        let min_size = *sizes.iter().min().unwrap();
 
        let max_size = *sizes.iter().max().unwrap();
 

	
 
        let mut array = RawArray::new();
 
        array.resize(max_size);
 
        fill(&mut array, max_size);
 
        for size in sizes {
 
            array.resize(size);
 
            assert_eq!(array.len(), size);
 
        }
 
        check(&array, min_size);
 
    }
 
}
 
\ No newline at end of file
src/macros.rs
Show inline comments
 
// Utility for performing debug printing within a particular module. Still
 
// requires some extra macros to be defined to be ergonomic.
 
macro_rules! enabled_debug_print {
 
    (false, $name:literal, $format:literal) => {};
 
    (false, $name:literal, $format:literal, $($args:expr),*) => {};
 
@@ -7,4 +9,13 @@ macro_rules! enabled_debug_print {
 
    (true, $name:literal, $format:literal, $($args:expr),*) => {
 
        println!("[{}] {}", $name, format!($format, $($args),*))
 
    };
 
}
 

	
 
// Utility for inserting code only executed in debug mode. Because writing the
 
// conditional cfg is tedious and looks ugly. Still doesn't work for struct
 
// fields, though.
 
macro_rules! dbg_code {
 
    ($code:stmt) => {
 
        #[cfg(debug_assertions)] $code
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/mod.rs
Show inline comments
 
pub mod component;
 
pub mod unfair_se_lock;
 
pub mod queue_mpsc;
 

	
 
pub(crate) use component::ComponentStore;
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
new file 100644
 
use std::sync::atomic::{AtomicU32, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock};
 

	
 
/// Multiple-producer single-consumer queue. Generally used in the publicly
 
/// accessible fields of a component. The holder of this struct should be the
 
/// consumer. To retrieve access to the producer-side: call `producer()`.
 
///
 
/// This is a queue that will resize (indefinitely) if it becomes full, and will
 
/// not shrink. So probably a temporary thing.
 
///
 
/// In debug mode we'll make sure that there are no producers when the queue is
 
/// dropped. We don't do this in release mode because the runtime is written
 
/// such that components always remain alive (hence, this queue will remain
 
/// accessible) while there are references to it.
 
pub struct QueueDynMpsc<T: 'static> {
 
    // Entire contents are boxed up such that we can create producers that have
 
    // a pointer to the contents.
 
    inner: Box<Shared<T>>
 
}
 

	
 
// One may move around the queue between threads, as long as there is only one
 
// instance of it.
 
unsafe impl<T> Send for QueueDynMpsc<T>{}
 

	
 
/// Shared data between queue consumer and the queue producers
 
struct Shared<T: 'static> {
 
    data: UnfairSeLock<Inner<T>>,
 
    read_head: u32,
 
    write_head: AtomicU32,
 
    limit_head: AtomicU32,
 
    #[cfg(debug_assertions)] dbg: AtomicU32,
 
}
 

	
 
/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
 
/// inner data array is resized.
 
struct Inner<T> {
 
    data: Vec<T>,
 
    compare_mask: u32,
 
    read_mask: u32,
 
}
 

	
 
impl<T> QueueDynMpsc<T> {
 
    /// Constructs a new MPSC queue. Note that the initial capacity is always
 
    /// increased to the next power of 2 (if it isn't already).
 
    pub fn new(initial_capacity: usize) -> Self {
 
        let initial_capacity = initial_capacity.next_power_of_two();
 
        Self::assert_correct_size(initial_capacity);
 

	
 
        return Self{
 
            inner: Box::new(Shared {
 
                data: UnfairSeLock::new(Inner{
 
                    data: Vec::with_capacity(initial_capacity),
 
                    compare_mask: (2 * initial_capacity as u32) - 1,
 
                    read_mask: initial_capacity as u32 - 1,
 
                }),
 
                read_head: 0,
 
                write_head: AtomicU32::new(0),
 
                limit_head: AtomicU32::new(0),
 
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
 
            }),
 
        };
 
    }
 

	
 
    #[inline]
 
    pub fn producer(&self) -> QueueDynProducer<T> {
 
        return QueueDynProducer::new(self);
 
    }
 

	
 
    /// Perform an attempted read from the queue. It might be that some producer
 
    /// is putting something in the queue while this function is executing, and
 
    /// we don't get the consume it.
 
    pub fn read(&mut self) -> Option<T> {
 
        let cur_read = self.inner.read_head;
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let data_lock = self.inner.data.lock_shared();
 

	
 
        if cur_read != cur_limit {
 
            // Make a bitwise copy of the value and return it. The receiver is
 
            // responsible for dropping it.
 
            unsafe {
 
                let source = data_lock.data.as_ptr().add((cur_read & data_lock.read_mask) as usize);
 
                self.inner.read_head += 1;
 
                return Some(std::ptr::read(source));
 
            }
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn assert_correct_size(capacity: usize) {
 
        assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynMpsc<T> {
 
    fn drop(&mut self) {
 
        // There should be no more `QueueDynProducer` pointers to this queue
 
        dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
 
        // And so the limit head should be equal to the write head
 
        let write_index = self.inner.write_head.load(Ordering::Acquire);
 
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);
 

	
 
        // Every item that has not yet been taken out of the queue needs to
 
        // have
 
        while self.inner.read_head != write_index {
 

	
 
        }
 
    }
 
}
 

	
 
pub struct QueueDynProducer<T: 'static> {
 
    queue: &'static Shared<T>,
 
}
 

	
 
impl<T> QueueDynProducer<T> {
 
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
 
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
 
        unsafe {
 
            // If you only knew the power of the dark side! Obi-Wan never told
 
            // you what happened to your father!
 
            let queue: &'static _ = std::mem::transmute(consumer.inner.as_ref());
 
            return Self{ queue };
 
        }
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynProducer<T> {
 
    fn drop(&mut self) {
 
        dbg_code!(self.queue.dbg.fetch_sub(1, Ordering::AcqRel));
 
    }
 
}
 

	
 
// producer end is `Send`, because in debug mode we make sure that there are no
 
// more producers when the queue is destroyed. But is not sync, because that
 
// would circumvent our atomic counter shenanigans.
 
unsafe impl<T> Send for QueueDynProducer<T>{}
 
\ No newline at end of file
0 comments (0 inline, 0 general)