use std::sync::atomic::{AtomicU32, Ordering}; use crate::collections::RawArray; use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard}; /// Multiple-producer single-consumer queue. Generally used in the publicly /// accessible fields of a component. The holder of this struct should be the /// consumer. To retrieve access to the producer-side: call `producer()`. /// /// This is a queue that will resize (indefinitely) if it becomes full, and will /// not shrink. So probably a temporary thing. /// /// In debug mode we'll make sure that there are no producers when the queue is /// dropped. We don't do this in release mode because the runtime is written /// such that components always remain alive (hence, this queue will remain /// accessible) while there are references to it. pub struct QueueDynMpsc { // Entire contents are boxed up such that we can create producers that have // a pointer to the contents. inner: Box> } // One may move around the queue between threads, as long as there is only one // instance of it. unsafe impl Send for QueueDynMpsc{} /// Shared data between queue consumer and the queue producers struct Shared { data: UnfairSeLock>, read_head: AtomicU32, write_head: AtomicU32, limit_head: AtomicU32, #[cfg(debug_assertions)] dbg: AtomicU32, } /// Locked by an exclusive/shared lock. Exclusive lock is obtained when the /// inner data array is resized. struct Inner { data: RawArray, compare_mask: u32, read_mask: u32, } type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner>; impl QueueDynMpsc { /// Constructs a new MPSC queue. Note that the initial capacity is always /// increased to the next power of 2 (if it isn't already). pub fn new(initial_capacity: usize) -> Self { let initial_capacity = initial_capacity.next_power_of_two(); Self::assert_correct_size(initial_capacity); let mut data = RawArray::new(); data.resize(initial_capacity); let initial_capacity = initial_capacity as u32; return Self{ inner: Box::new(Shared { data: UnfairSeLock::new(Inner{ data, compare_mask: (2 * initial_capacity) - 1, read_mask: initial_capacity - 1, }), read_head: AtomicU32::new(0), write_head: AtomicU32::new(initial_capacity), limit_head: AtomicU32::new(initial_capacity), #[cfg(debug_assertions)] dbg: AtomicU32::new(0), }), }; } #[inline] pub fn producer(&self) -> QueueDynProducer { return QueueDynProducer::new(self); } /// Perform an attempted read from the queue. It might be that some producer /// is putting something in the queue while this function is executing, and /// we don't get the consume it. pub fn pop(&mut self) -> Option { let data_lock = self.inner.data.lock_shared(); let cur_read = self.inner.read_head.load(Ordering::Acquire); let cur_limit = self.inner.limit_head.load(Ordering::Acquire); let buf_size = data_lock.data.cap(); if (cur_read + buf_size) & data_lock.compare_mask != cur_limit { // Make a bitwise copy of the value and return it. The receiver is // responsible for dropping it. unsafe { let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize); // We can perform a store since we're the only ones modifying // the atomic. self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release); return Some(std::ptr::read(source)); } } else { return None; } } #[inline] fn assert_correct_size(capacity: usize) { assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2); } } impl Drop for QueueDynMpsc { fn drop(&mut self) { // There should be no more `QueueDynProducer` pointers to this queue dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0)); // And so the limit head should be equal to the write head let data_lock = self.inner.data.lock_shared(); let write_index = self.inner.write_head.load(Ordering::Acquire); assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index); // Every item that has not yet been taken out of the queue needs to // have its destructor called. We immediately apply the // increment-by-size trick and wait until we've hit the write head. let mut read_index = self.inner.read_head.load(Ordering::Acquire); read_index += data_lock.data.cap(); while read_index & data_lock.compare_mask != write_index { unsafe { let target = data_lock.data.get((read_index & data_lock.read_mask) as usize); std::ptr::drop_in_place(target); } read_index += 1; } } } pub struct QueueDynProducer { queue: *const Shared, } impl QueueDynProducer { fn new(consumer: &QueueDynMpsc) -> Self { dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel)); unsafe { // If you only knew the power of the dark side! Obi-Wan never told // you what happened to your father! let queue: *const _ = std::mem::transmute(consumer.inner.as_ref()); return Self{ queue }; } } pub fn push(&self, value: T) { let queue = unsafe{ &*self.queue }; let data_lock = queue.data.lock_shared(); let read_index = queue.read_head.load(Ordering::Acquire); let write_index = queue.write_head.load(Ordering::Acquire); if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing let expected_capacity = data_lock.data.cap(); } } fn resize(&self, shared_lock: InnerRead, expected_capacity: usize) -> InnerRead { drop(shared_lock); let queue = unsafe{ &*self.queue }; { let exclusive_lock = self.queue.data.lock_exclusive(); // We hold the exclusive lock, but someone else might have done the resizing, and so: if exclusive_lock.data.cap() == expected_capacity { let old_capacity = expected_capacity; let new_capacity = 2 * old_capacity; // Resize by a factor of two, and make the two halves identical. exclusive_lock.data.resize(new_capacity); for idx in old_capacity..new_capacity { unsafe { let target = exclusive_lock.data.get(idx); let source = exclusive_lock.data.get(idx - old_capacity); std::ptr::write(target, std::ptr::read(source)); } } // Modify all atomics to reflect that we just resized the // underlying buffer. } } // Reacquire shared lock return queue.data.lock_shared(); } } impl Drop for QueueDynProducer { fn drop(&mut self) { dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) }); } } // producer end is `Send`, because in debug mode we make sure that there are no // more producers when the queue is destroyed. But is not sync, because that // would circumvent our atomic counter shenanigans. unsafe impl Send for QueueDynProducer{}