diff --git a/src/collections/mod.rs b/src/collections/mod.rs index df772f4aea8eb0269a93012cd581299a8ebe9830..71a6365d0935dd44ab62aaf0d44b38282012b2fa 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -1,7 +1,8 @@ mod string_pool; mod scoped_buffer; mod sets; -mod raw_vec; +mod raw_vec; // TODO: Delete? +mod raw_array; // mod freelist; diff --git a/src/collections/raw_array.rs b/src/collections/raw_array.rs new file mode 100644 index 0000000000000000000000000000000000000000..93fa6d7b25f5a2bcae741b1716e6087deb2e2982 --- /dev/null +++ b/src/collections/raw_array.rs @@ -0,0 +1,184 @@ +use std::{mem, ptr}; +use std::alloc::{Layout, alloc, dealloc}; + +/// Very simple resizable array. Doesn't call destructors or anything. Just +/// makes sure that the array is cleaned up when dropped, and allows the user +/// to ergonomically resize. Assumes that `size_of::() != 0` (and checks this +/// in debug mode). +pub struct RawArray { + data: *mut T, + count: usize, +} + +impl RawArray { + const SIZE: usize = mem::size_of::(); + const ALIGNMENT: usize = mem::align_of::(); + + /// Constructs a new and empty (not allocated) array. + pub fn new() -> Self { + return Self{ + data: ptr::null_mut(), + count: 0, + } + } + + /// Resizes the array. All existing elements are preserved (whether the + /// contained bytes are bogus or not). + pub fn resize(&mut self, new_count: usize) { + if new_count > self.count { + let new_data = Self::allocate(new_count); + if !self.data.is_null() { + unsafe { + ptr::copy_nonoverlapping(self.data, new_data, self.count); + Self::deallocate(self.data, self.count); + } + } + + self.data = new_data; + self.count = new_count; + } else if new_count < self.count { + // `new_count < count` and `new_count >= 0`, so `data != null`. + if new_count == 0 { + Self::deallocate(self.data, self.count); + self.data = ptr::null_mut(); + self.count = 0; + } else { + let new_data = Self::allocate(new_count); + unsafe { + ptr::copy_nonoverlapping(self.data, new_data, new_count); + Self::deallocate(self.data, self.count); + } + self.data = new_data; + self.count = new_count; + } + } // otherwise: equal + } + + /// Retrieves mutable pointer to the value at the specified index. The + /// returned `*mut T` may point to bogus uninitialized memory. + pub fn get(&self, index: usize) -> *mut T { + debug_assert!(index < self.count); // at least some safety, amirite? + return unsafe{ self.data.add(index) }; + } + + /// Retrieves the base pointer of the array. Hence may be null, or may point + /// to bogus uninitialized memory. + pub fn data(&self) -> *mut T { + return self.data; + } + + /// Returns the length of the array. + pub fn len(&self) -> usize { + return self.count; + } + + fn allocate(count: usize) -> *mut T { + debug_assert_ne!(Self::SIZE, 0); + let size = count * Self::SIZE; + unsafe { + let layout = Layout::from_size_align_unchecked(size, Self::ALIGNMENT); + let data = alloc(layout); + return mem::transmute(data); + } + } + + fn deallocate(data: *mut T, count: usize) { + let size = count * Self::SIZE; + unsafe { + let layout = Layout::from_size_align_unchecked(size, Self::ALIGNMENT); + dealloc(mem::transmute(data), layout); + } + } +} + +impl Drop for RawArray { + fn drop(&mut self) { + if !self.data.is_null() { + Self::deallocate(self.data, self.count); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fill(array: &mut RawArray, count: usize) { + for idx in 0..count { + unsafe{ *array.get(idx) = idx } + } + } + + fn check(array: &RawArray, count: usize) { + for idx in 0..count { + assert_eq!(unsafe{ *array.get(idx) }, idx); + } + } + + #[test] + fn drop_empty_array() { + let array = RawArray::::new(); + assert_eq!(array.len(), 0); + assert_eq!(array.data(), ptr::null_mut()); + } + + #[test] + fn increase_size() { + const INIT_SIZE: usize = 16; + const NUM_RESIZES: usize = 4; + let mut array = RawArray::new(); + array.resize(INIT_SIZE); + fill(&mut array, INIT_SIZE); + + for grow_idx in 0..NUM_RESIZES { + let new_size = INIT_SIZE + grow_idx * 4; + array.resize(new_size); + assert_eq!(array.len(), new_size); + } + + check(&array, INIT_SIZE); + } + + #[test] + fn maintain_size() { + const INIT_SIZE: usize = 16; + const NUM_RESIZES :usize = 4; + + let mut array = RawArray::new(); + array.resize(INIT_SIZE); + fill(&mut array, INIT_SIZE); + for _idx in 0..NUM_RESIZES { + array.resize(INIT_SIZE); + assert_eq!(array.len(), INIT_SIZE); + } + check(&array, INIT_SIZE); + } + + #[test] + fn decrease_size() { + const INIT_SIZE: usize = 16; + const FINAL_SIZE: usize = 8; + + let mut array = RawArray::new(); + array.resize(INIT_SIZE); + fill(&mut array, INIT_SIZE); + array.resize(FINAL_SIZE); + check(&array, FINAL_SIZE); + } + + #[test] + fn increase_and_decrease_size() { + let sizes = [12, 8, 6, 150, 128, 32, 16, 90, 4, 18, 27]; + let min_size = *sizes.iter().min().unwrap(); + let max_size = *sizes.iter().max().unwrap(); + + let mut array = RawArray::new(); + array.resize(max_size); + fill(&mut array, max_size); + for size in sizes { + array.resize(size); + assert_eq!(array.len(), size); + } + check(&array, min_size); + } +} \ No newline at end of file diff --git a/src/macros.rs b/src/macros.rs index 2d27e951b6bcd4f6cace5a6df3824f08d3aa3957..7777d1438c26ff8c770480bee68c179d5490e364 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -1,3 +1,5 @@ +// Utility for performing debug printing within a particular module. Still +// requires some extra macros to be defined to be ergonomic. macro_rules! enabled_debug_print { (false, $name:literal, $format:literal) => {}; (false, $name:literal, $format:literal, $($args:expr),*) => {}; @@ -7,4 +9,13 @@ macro_rules! enabled_debug_print { (true, $name:literal, $format:literal, $($args:expr),*) => { println!("[{}] {}", $name, format!($format, $($args),*)) }; +} + +// Utility for inserting code only executed in debug mode. Because writing the +// conditional cfg is tedious and looks ugly. Still doesn't work for struct +// fields, though. +macro_rules! dbg_code { + ($code:stmt) => { + #[cfg(debug_assertions)] $code + } } \ No newline at end of file diff --git a/src/runtime2/store/mod.rs b/src/runtime2/store/mod.rs index 5746a4d669dabde8713683eb0178160e1cb3c809..f5ee1f0a39c16b8dfb05cd55cc8a62d5f85e0abd 100644 --- a/src/runtime2/store/mod.rs +++ b/src/runtime2/store/mod.rs @@ -1,4 +1,5 @@ pub mod component; pub mod unfair_se_lock; +pub mod queue_mpsc; pub(crate) use component::ComponentStore; diff --git a/src/runtime2/store/queue_mpsc.rs b/src/runtime2/store/queue_mpsc.rs new file mode 100644 index 0000000000000000000000000000000000000000..c93f1c96dd2bf2f052452376134b452cd59be796 --- /dev/null +++ b/src/runtime2/store/queue_mpsc.rs @@ -0,0 +1,138 @@ +use std::sync::atomic::{AtomicU32, Ordering}; + +use super::unfair_se_lock::{UnfairSeLock}; + +/// Multiple-producer single-consumer queue. Generally used in the publicly +/// accessible fields of a component. The holder of this struct should be the +/// consumer. To retrieve access to the producer-side: call `producer()`. +/// +/// This is a queue that will resize (indefinitely) if it becomes full, and will +/// not shrink. So probably a temporary thing. +/// +/// In debug mode we'll make sure that there are no producers when the queue is +/// dropped. We don't do this in release mode because the runtime is written +/// such that components always remain alive (hence, this queue will remain +/// accessible) while there are references to it. +pub struct QueueDynMpsc { + // Entire contents are boxed up such that we can create producers that have + // a pointer to the contents. + inner: Box> +} + +// One may move around the queue between threads, as long as there is only one +// instance of it. +unsafe impl Send for QueueDynMpsc{} + +/// Shared data between queue consumer and the queue producers +struct Shared { + data: UnfairSeLock>, + read_head: u32, + write_head: AtomicU32, + limit_head: AtomicU32, + #[cfg(debug_assertions)] dbg: AtomicU32, +} + +/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the +/// inner data array is resized. +struct Inner { + data: Vec, + compare_mask: u32, + read_mask: u32, +} + +impl QueueDynMpsc { + /// Constructs a new MPSC queue. Note that the initial capacity is always + /// increased to the next power of 2 (if it isn't already). + pub fn new(initial_capacity: usize) -> Self { + let initial_capacity = initial_capacity.next_power_of_two(); + Self::assert_correct_size(initial_capacity); + + return Self{ + inner: Box::new(Shared { + data: UnfairSeLock::new(Inner{ + data: Vec::with_capacity(initial_capacity), + compare_mask: (2 * initial_capacity as u32) - 1, + read_mask: initial_capacity as u32 - 1, + }), + read_head: 0, + write_head: AtomicU32::new(0), + limit_head: AtomicU32::new(0), + #[cfg(debug_assertions)] dbg: AtomicU32::new(0), + }), + }; + } + + #[inline] + pub fn producer(&self) -> QueueDynProducer { + return QueueDynProducer::new(self); + } + + /// Perform an attempted read from the queue. It might be that some producer + /// is putting something in the queue while this function is executing, and + /// we don't get the consume it. + pub fn read(&mut self) -> Option { + let cur_read = self.inner.read_head; + let cur_limit = self.inner.limit_head.load(Ordering::Acquire); + let data_lock = self.inner.data.lock_shared(); + + if cur_read != cur_limit { + // Make a bitwise copy of the value and return it. The receiver is + // responsible for dropping it. + unsafe { + let source = data_lock.data.as_ptr().add((cur_read & data_lock.read_mask) as usize); + self.inner.read_head += 1; + return Some(std::ptr::read(source)); + } + } else { + return None; + } + } + + #[inline] + fn assert_correct_size(capacity: usize) { + assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2); + } +} + +impl Drop for QueueDynMpsc { + fn drop(&mut self) { + // There should be no more `QueueDynProducer` pointers to this queue + dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0)); + // And so the limit head should be equal to the write head + let write_index = self.inner.write_head.load(Ordering::Acquire); + assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index); + + // Every item that has not yet been taken out of the queue needs to + // have + while self.inner.read_head != write_index { + + } + } +} + +pub struct QueueDynProducer { + queue: &'static Shared, +} + +impl QueueDynProducer { + fn new(consumer: &QueueDynMpsc) -> Self { + dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel)); + unsafe { + // If you only knew the power of the dark side! Obi-Wan never told + // you what happened to your father! + let queue: &'static _ = std::mem::transmute(consumer.inner.as_ref()); + return Self{ queue }; + } + } +} + +impl Drop for QueueDynProducer { + fn drop(&mut self) { + dbg_code!(self.queue.dbg.fetch_sub(1, Ordering::AcqRel)); + } +} + +// producer end is `Send`, because in debug mode we make sure that there are no +// more producers when the queue is destroyed. But is not sync, because that +// would circumvent our atomic counter shenanigans. +unsafe impl Send for QueueDynProducer{} \ No newline at end of file