diff --git a/src/collections/mod.rs b/src/collections/mod.rs index 71a6365d0935dd44ab62aaf0d44b38282012b2fa..06d9f12d6a8eef9f2db9e60867c520e59ea70ab6 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -9,4 +9,5 @@ mod raw_array; pub(crate) use string_pool::{StringPool, StringRef}; pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection}; pub(crate) use sets::{DequeSet, VecSet}; -pub(crate) use raw_vec::RawVec; \ No newline at end of file +pub(crate) use raw_vec::RawVec; +pub(crate) use raw_array::RawArray; \ No newline at end of file diff --git a/src/collections/raw_array.rs b/src/collections/raw_array.rs index 93fa6d7b25f5a2bcae741b1716e6087deb2e2982..6773f70e26ca5d08a298111c3a4ef0c8ba3f41cb 100644 --- a/src/collections/raw_array.rs +++ b/src/collections/raw_array.rs @@ -67,8 +67,8 @@ impl RawArray { return self.data; } - /// Returns the length of the array. - pub fn len(&self) -> usize { + /// Returns the capacity of the array. + pub fn cap(&self) -> usize { return self.count; } @@ -118,7 +118,7 @@ mod tests { #[test] fn drop_empty_array() { let array = RawArray::::new(); - assert_eq!(array.len(), 0); + assert_eq!(array.cap(), 0); assert_eq!(array.data(), ptr::null_mut()); } @@ -133,7 +133,7 @@ mod tests { for grow_idx in 0..NUM_RESIZES { let new_size = INIT_SIZE + grow_idx * 4; array.resize(new_size); - assert_eq!(array.len(), new_size); + assert_eq!(array.cap(), new_size); } check(&array, INIT_SIZE); @@ -149,7 +149,7 @@ mod tests { fill(&mut array, INIT_SIZE); for _idx in 0..NUM_RESIZES { array.resize(INIT_SIZE); - assert_eq!(array.len(), INIT_SIZE); + assert_eq!(array.cap(), INIT_SIZE); } check(&array, INIT_SIZE); } @@ -177,7 +177,7 @@ mod tests { fill(&mut array, max_size); for size in sizes { array.resize(size); - assert_eq!(array.len(), size); + assert_eq!(array.cap(), size); } check(&array, min_size); } diff --git a/src/runtime2/store/component.rs b/src/runtime2/store/component.rs index 12c56123b5ca25e899ce0ddd1ad8bb72747f72de..d93245908fd4587e882f148afe5db432006579bd 100644 --- a/src/runtime2/store/component.rs +++ b/src/runtime2/store/component.rs @@ -213,6 +213,8 @@ impl ComponentStore { unsafe{ ptr::drop_in_place(target_ptr); } } + // NOTE: Bit of a mess, and could have a cleanup with better logic for the + // resizing. Maybe even a different indexing scheme... fn reallocate(&self, old_size: usize, inner: InnerRead) -> InnerRead { drop(inner); { diff --git a/src/runtime2/store/queue_mpsc.rs b/src/runtime2/store/queue_mpsc.rs index c93f1c96dd2bf2f052452376134b452cd59be796..edcdd72dd0e56436beeebba6a621652d0e102910 100644 --- a/src/runtime2/store/queue_mpsc.rs +++ b/src/runtime2/store/queue_mpsc.rs @@ -1,6 +1,7 @@ use std::sync::atomic::{AtomicU32, Ordering}; -use super::unfair_se_lock::{UnfairSeLock}; +use crate::collections::RawArray; +use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard}; /// Multiple-producer single-consumer queue. Generally used in the publicly /// accessible fields of a component. The holder of this struct should be the @@ -13,7 +14,7 @@ use super::unfair_se_lock::{UnfairSeLock}; /// dropped. We don't do this in release mode because the runtime is written /// such that components always remain alive (hence, this queue will remain /// accessible) while there are references to it. -pub struct QueueDynMpsc { +pub struct QueueDynMpsc { // Entire contents are boxed up such that we can create producers that have // a pointer to the contents. inner: Box> @@ -24,9 +25,9 @@ pub struct QueueDynMpsc { unsafe impl Send for QueueDynMpsc{} /// Shared data between queue consumer and the queue producers -struct Shared { +struct Shared { data: UnfairSeLock>, - read_head: u32, + read_head: AtomicU32, write_head: AtomicU32, limit_head: AtomicU32, #[cfg(debug_assertions)] dbg: AtomicU32, @@ -35,11 +36,13 @@ struct Shared { /// Locked by an exclusive/shared lock. Exclusive lock is obtained when the /// inner data array is resized. struct Inner { - data: Vec, + data: RawArray, compare_mask: u32, read_mask: u32, } +type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner>; + impl QueueDynMpsc { /// Constructs a new MPSC queue. Note that the initial capacity is always /// increased to the next power of 2 (if it isn't already). @@ -47,16 +50,21 @@ impl QueueDynMpsc { let initial_capacity = initial_capacity.next_power_of_two(); Self::assert_correct_size(initial_capacity); + let mut data = RawArray::new(); + data.resize(initial_capacity); + + let initial_capacity = initial_capacity as u32; + return Self{ inner: Box::new(Shared { data: UnfairSeLock::new(Inner{ - data: Vec::with_capacity(initial_capacity), - compare_mask: (2 * initial_capacity as u32) - 1, - read_mask: initial_capacity as u32 - 1, + data, + compare_mask: (2 * initial_capacity) - 1, + read_mask: initial_capacity - 1, }), - read_head: 0, - write_head: AtomicU32::new(0), - limit_head: AtomicU32::new(0), + read_head: AtomicU32::new(0), + write_head: AtomicU32::new(initial_capacity), + limit_head: AtomicU32::new(initial_capacity), #[cfg(debug_assertions)] dbg: AtomicU32::new(0), }), }; @@ -70,17 +78,20 @@ impl QueueDynMpsc { /// Perform an attempted read from the queue. It might be that some producer /// is putting something in the queue while this function is executing, and /// we don't get the consume it. - pub fn read(&mut self) -> Option { - let cur_read = self.inner.read_head; - let cur_limit = self.inner.limit_head.load(Ordering::Acquire); + pub fn pop(&mut self) -> Option { let data_lock = self.inner.data.lock_shared(); + let cur_read = self.inner.read_head.load(Ordering::Acquire); + let cur_limit = self.inner.limit_head.load(Ordering::Acquire); + let buf_size = data_lock.data.cap(); - if cur_read != cur_limit { + if (cur_read + buf_size) & data_lock.compare_mask != cur_limit { // Make a bitwise copy of the value and return it. The receiver is // responsible for dropping it. unsafe { - let source = data_lock.data.as_ptr().add((cur_read & data_lock.read_mask) as usize); - self.inner.read_head += 1; + let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize); + // We can perform a store since we're the only ones modifying + // the atomic. + self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release); return Some(std::ptr::read(source)); } } else { @@ -99,19 +110,27 @@ impl Drop for QueueDynMpsc { // There should be no more `QueueDynProducer` pointers to this queue dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0)); // And so the limit head should be equal to the write head + let data_lock = self.inner.data.lock_shared(); let write_index = self.inner.write_head.load(Ordering::Acquire); assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index); // Every item that has not yet been taken out of the queue needs to - // have - while self.inner.read_head != write_index { - + // have its destructor called. We immediately apply the + // increment-by-size trick and wait until we've hit the write head. + let mut read_index = self.inner.read_head.load(Ordering::Acquire); + read_index += data_lock.data.cap(); + while read_index & data_lock.compare_mask != write_index { + unsafe { + let target = data_lock.data.get((read_index & data_lock.read_mask) as usize); + std::ptr::drop_in_place(target); + } + read_index += 1; } } } -pub struct QueueDynProducer { - queue: &'static Shared, +pub struct QueueDynProducer { + queue: *const Shared, } impl QueueDynProducer { @@ -120,15 +139,58 @@ impl QueueDynProducer { unsafe { // If you only knew the power of the dark side! Obi-Wan never told // you what happened to your father! - let queue: &'static _ = std::mem::transmute(consumer.inner.as_ref()); + let queue: *const _ = std::mem::transmute(consumer.inner.as_ref()); return Self{ queue }; } } + + pub fn push(&self, value: T) { + let queue = unsafe{ &*self.queue }; + + let data_lock = queue.data.lock_shared(); + let read_index = queue.read_head.load(Ordering::Acquire); + let write_index = queue.write_head.load(Ordering::Acquire); + if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing + let expected_capacity = data_lock.data.cap(); + } + } + + fn resize(&self, shared_lock: InnerRead, expected_capacity: usize) -> InnerRead { + drop(shared_lock); + let queue = unsafe{ &*self.queue }; + + { + let exclusive_lock = self.queue.data.lock_exclusive(); + + // We hold the exclusive lock, but someone else might have done the resizing, and so: + if exclusive_lock.data.cap() == expected_capacity { + let old_capacity = expected_capacity; + let new_capacity = 2 * old_capacity; + + // Resize by a factor of two, and make the two halves identical. + exclusive_lock.data.resize(new_capacity); + for idx in old_capacity..new_capacity { + unsafe { + let target = exclusive_lock.data.get(idx); + let source = exclusive_lock.data.get(idx - old_capacity); + std::ptr::write(target, std::ptr::read(source)); + } + } + + // Modify all atomics to reflect that we just resized the + // underlying buffer. + + } + } + + // Reacquire shared lock + return queue.data.lock_shared(); + } } impl Drop for QueueDynProducer { fn drop(&mut self) { - dbg_code!(self.queue.dbg.fetch_sub(1, Ordering::AcqRel)); + dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) }); } }